Skip to content

Commit

Permalink
Merge branch 'master' into scatter-elision
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen committed Aug 30, 2021
2 parents ddf0f7a + 00662f4 commit e88bc45
Show file tree
Hide file tree
Showing 91 changed files with 2,841 additions and 1,624 deletions.
20 changes: 0 additions & 20 deletions .github/workflows/issue_assigned.yml

This file was deleted.

6 changes: 5 additions & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engi
return errors.Trace(err)
}

return errors.Trace(ce.Import(ctx))
regionSplitSize := int64(cfg.TikvImporter.RegionSplitSize)
if regionSplitSize == 0 {
regionSplitSize = int64(config.SplitRegionSize)
}
return errors.Trace(ce.Import(ctx, regionSplitSize))
}

func cleanupEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
Expand Down
15 changes: 8 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func BuildBackupRangeAndSchema(
continue
}

idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.RowIDAllocType)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.SequenceType)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.AutoRandomType)

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -294,14 +290,19 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = seqAlloc.NextGlobalAutoID()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -311,7 +312,7 @@ func BuildBackupRangeAndSchema(
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -310,7 +310,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
Expand All @@ -320,7 +320,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -436,12 +436,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID).
ImportEngine(ctx, engineUUID, gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
Expand All @@ -70,7 +70,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
c.Assert(err, IsNil)
closedEngine, err := engine.Close(ctx, nil)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
err = closedEngine.Cleanup(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -252,12 +252,12 @@ func (s *backendSuite) TestImportFailedNoRetry(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, "fake unrecoverable import error.*")
}

Expand All @@ -269,14 +269,14 @@ func (s *backendSuite) TestImportFailedWithRetry(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.New("fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, ".*fake recoverable import error")
}

Expand All @@ -288,16 +288,16 @@ func (s *backendSuite) TestImportFailedRecovered(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.New("fake recoverable import error"))
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (importer *importer) Flush(_ context.Context, _ uuid.UUID) error {
return nil
}

func (importer *importer) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error {
func (importer *importer) ImportEngine(ctx context.Context, engineUUID uuid.UUID, _ int64) error {
importer.lock.Lock()
defer importer.lock.Unlock()
req := &import_kvpb.ImportEngineRequest{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *importerSuite) TestCloseImportCleanupEngine(c *C) {

engine, err := s.engine.Close(s.ctx, nil)
c.Assert(err, IsNil)
err = engine.Import(s.ctx)
err = engine.Import(s.ctx, 1)
c.Assert(err, IsNil)
err = engine.Cleanup(s.ctx)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewPanickingAllocators(base int64) autoid.Allocators {
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
func (alloc *panickingAllocator) Rebase(newBase int64, allocIDs bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
Expand Down
Loading

0 comments on commit e88bc45

Please sign in to comment.