Skip to content

Commit

Permalink
lightning: fix id too large after parallel import (#57398)
Browse files Browse the repository at this point in the history
close #56814
  • Loading branch information
D3Hunter authored Nov 19, 2024
1 parent 975ad1f commit ecca340
Show file tree
Hide file tree
Showing 33 changed files with 587 additions and 325 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ br_bins:

.PHONY: data_parsers
data_parsers: tools/bin/vfsgendev pkg/lightning/mydump/parser_generated.go lightning_web
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOMODCACHE)" pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
tools/bin/vfsgendev -source='"github.com/pingcap/tidb/lightning/pkg/web".Res' && mv res_vfsdata.go lightning/pkg/web/

.PHONY: build_dumpling
Expand Down
3 changes: 2 additions & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

PROJECT=tidb
GOPATH ?= $(shell go env GOPATH)
GOMODCACHE ?= $(shell go env GOMODCACHE)
P=8

# Ensure GOPATH is set before running build process.
Expand Down Expand Up @@ -132,4 +133,4 @@ ifneq ("$(CI)", "")
endif
BAZEL_INSTRUMENTATION_FILTER := --instrument_test_targets --instrumentation_filter=//pkg/...,//br/...,//dumpling/...

NOGO_FLAG=true
NOGO_FLAG=true
2 changes: 1 addition & 1 deletion lightning/pkg/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
if err != nil {
return 0.0, false, errors.Trace(err)
}
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0)
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)
if err != nil {
return 0.0, false, errors.Trace(err)
Expand Down
19 changes: 8 additions & 11 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,20 +1979,17 @@ type deliverResult struct {
}

func saveCheckpoint(rc *Controller, t *TableImporter, engineID int32, chunk *checkpoints.ChunkCheckpoint) {
// We need to update the AllocBase every time we've finished a file.
// The AllocBase is determined by the maximum of the "handle" (_tidb_rowid
// or integer primary key), which can only be obtained by reading all data.

var base int64
if t.tableInfo.Core.ContainsAutoRandomBits() {
base = t.alloc.Get(autoid.AutoRandomType).Base() + 1
} else {
base = t.alloc.Get(autoid.RowIDAllocType).Base() + 1
}
// we save the XXXBase every time a chunk is finished.
// Note, it's possible some chunk with larger autoID range finished first, so
// the saved XXXBase is larger, when chunks with smaller autoID range finished
// it might have no effect on the saved XXXBase, but it's OK, we only need
// the largest.
rc.saveCpCh <- saveCp{
tableName: t.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: base,
AutoRandBase: t.alloc.Get(autoid.AutoRandomType).Base(),
AutoIncrBase: t.alloc.Get(autoid.AutoIncrementType).Base(),
AutoRowIDBase: t.alloc.Get(autoid.RowIDAllocType).Base(),
},
}
rc.saveCpCh <- saveCp{
Expand Down
85 changes: 52 additions & 33 deletions lightning/pkg/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableImporter) tableMetaMgr {

type tableMetaMgr interface {
InitTableMeta(ctx context.Context) error
AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error)
AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error)
UpdateTableStatus(ctx context.Context, status metaStatus) error
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
Expand Down Expand Up @@ -177,7 +177,7 @@ func parseMetaStatus(s string) (metaStatus, error) {
}
}

func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) {
func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, requiredRowIDCnt int64) (*verify.KVChecksum, int64, error) {
conn, err := m.session.Conn(ctx)
if err != nil {
return nil, 0, errors.Trace(err)
Expand All @@ -188,22 +188,31 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
DB: conn,
Logger: m.tr.logger,
}
var newRowIDBase, newRowIDMax int64
curStatus := metaStatusInitial
// (myStartRowID, myEndRowID] is the range of row_id that current instance
// can use to encode the table.
var myStartRowID, myEndRowID int64
myStatus := metaStatusInitial
newStatus := metaStatusRowIDAllocated
var baseTotalKvs, baseTotalBytes, baseChecksum uint64
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
if err != nil {
return nil, 0, errors.Annotate(err, "enable pessimistic transaction failed")
}

needAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
hasAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
tableChecksumingMsg := "Target table is calculating checksum. Please wait until the checksum is finished and try again."
doAllocTableRowIDsFn := func() error {
return exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
// lightning follows below calling sequence, so at most one client
// can execute the code after the FOR UPDATE part for some table,
// even though FOR UPDATE only lock rows that matches the condition:
// - insert into table_meta with key (table_id, task_id)
// - try lock with FOR UPDATE
rows, err := tx.QueryContext(
ctx,
common.SprintfWithIdentifiers("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status FROM %s.%s WHERE table_id = ? FOR UPDATE", m.schemaName, m.tableName),
common.SprintfWithIdentifiers(`
SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status
FROM %s.%s WHERE table_id = ? FOR UPDATE`, m.schemaName, m.tableName),
m.tr.tableInfo.ID,
)
if err != nil {
Expand Down Expand Up @@ -234,16 +243,16 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

if metaTaskID == m.taskID {
curStatus = status
myStatus = status
baseChecksum = checksum
baseTotalKvs = totalKvs
baseTotalBytes = totalBytes
if status >= metaStatusRowIDAllocated {
if rowIDMax-rowIDBase != rawRowIDMax {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase)
if rowIDMax-rowIDBase != requiredRowIDCnt {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", requiredRowIDCnt, rowIDMax-rowIDBase)
}
newRowIDBase = rowIDBase
newRowIDMax = rowIDMax
myStartRowID = rowIDBase
myEndRowID = rowIDMax
break
}
continue
Expand All @@ -263,36 +272,43 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

// no enough info are available, fetch row_id max for table
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id,
// the id allocators are separated in this case.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if myStatus == metaStatusInitial {
// if the table don't have auto id, we still guarantee that the
// row ID is unique across all lightning instances.
// or if someone have already allocated the auto id, we can continue
// allocating from previous maxRowIDMax.
if !hasAutoID || maxRowIDMax > 0 {
myStartRowID = maxRowIDMax
} else {
// we are the first one to allocate the auto id, we need to
// fetch the max auto id base from the table, and allocate
// from there.
// as we only have one estimated requiredRowIDCount, but the
// table might have multiple allocators, so we use the max
// of them.
maxAutoIDBase, err := common.GetMaxAutoIDBase(m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
} else {
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax
myStartRowID = maxAutoIDBase
}
myEndRowID = myStartRowID + requiredRowIDCnt

// table contains no data, can skip checksum
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
// if we are the first one to allocate, the table has auto-id,
// and our start is 0, it means the table is empty, so we move
// the state to next one directly without going through below
// checksum branch.
if hasAutoID && myStartRowID == 0 && newStatus < metaStatusRestoreStarted {
newStatus = metaStatusRestoreStarted
}

query := common.SprintfWithIdentifiers("UPDATE %s.%s SET row_id_base = ?, row_id_max = ?, status = ? WHERE table_id = ? AND task_id = ?", m.schemaName, m.tableName)
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
_, err := tx.ExecContext(ctx, query, myStartRowID, myEndRowID, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
if err != nil {
return errors.Trace(err)
}

curStatus = newStatus
myStatus = newStatus
}
return nil
})
Expand Down Expand Up @@ -325,9 +341,12 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64

var checksum *verify.KVChecksum
// need to do checksum and update checksum meta since we are the first one.
if curStatus < metaStatusRestoreStarted {
// table contains data but haven't do checksum yet
if (newRowIDBase > 0 || !needAutoID) && m.needChecksum && baseTotalKvs == 0 {
if myStatus < metaStatusRestoreStarted {
// the table might have data if our StartRowID is not 0, or if the table
// don't have any auto id.
if (myStartRowID > 0 || !hasAutoID) && m.needChecksum && baseTotalKvs == 0 {
// if another instance finished import before below checksum logic,
// it will cause checksum mismatch, but it's very rare.
remoteCk, err := DoChecksum(ctx, m.tr.tableInfo)
if err != nil {
return nil, 0, errors.Trace(err)
Expand All @@ -354,11 +373,11 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
checksum = &ck
}
log.FromContext(ctx).Info("allocate table row_id base", zap.String("table", m.tr.tableName),
zap.Int64("row_id_base", newRowIDBase))
zap.Int64("startRowID", myStartRowID), zap.Int64("endRowID", myEndRowID))
if checksum != nil {
log.FromContext(ctx).Info("checksum base", zap.Any("checksum", checksum))
}
return checksum, newRowIDBase, nil
return checksum, myStartRowID, nil
}

func (m *dbTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error {
Expand Down
36 changes: 24 additions & 12 deletions lightning/pkg/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewTableImporter(
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase)
idAlloc := kv.NewPanickingAllocatorsWithBase(tableInfo.Core.SepAutoInc(), cp.AutoRandBase, cp.AutoIncrBase, cp.AutoRowIDBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
Expand Down Expand Up @@ -143,12 +143,15 @@ func (tr *TableImporter) importTable(
}

// fetch the max chunk row_id max value as the global max row_id
rowIDMax := int64(0)
requiredRowIDCnt := int64(0)
for _, engine := range cp.Engines {
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax {
rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > requiredRowIDCnt {
requiredRowIDCnt = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax
}
}
tr.logger.Info("estimated required row id count",
zap.String("table", tr.tableName),
zap.Int64("count", requiredRowIDCnt))
versionStr, err := version.FetchVersion(ctx, rc.db)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -163,7 +166,7 @@ func (tr *TableImporter) importTable(
return false, err
}

checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax)
checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, requiredRowIDCnt)
if err != nil {
return false, err
}
Expand All @@ -187,22 +190,31 @@ func (tr *TableImporter) importTable(
}
web.BroadcastTableCheckpoint(tr.tableName, cp)

// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
// rebase the allocator based on the max ID from table info.
ti := tr.tableInfo.Core
if ti.ContainsAutoRandomBits() {
cp.AutoRandBase = max(cp.AutoRandBase, ti.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AutoRandBase, false); err != nil {
return false, err
}
} else {
cp.AllocBase = max(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
if ti.GetAutoIncrementColInfo() != nil && ti.SepAutoInc() {
cp.AutoIncrBase = max(cp.AutoIncrBase, ti.AutoIncID)
if err := tr.alloc.Get(autoid.AutoIncrementType).Rebase(context.Background(), cp.AutoIncrBase, false); err != nil {
return false, err
}
}
cp.AutoRowIDBase = max(cp.AutoRowIDBase, ti.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AutoRowIDBase, false); err != nil {
return false, err
}
}
rc.saveCpCh <- saveCp{
tableName: tr.tableName,
merger: &checkpoints.RebaseCheckpointMerger{
AllocBase: cp.AllocBase,
AutoRandBase: cp.AutoRandBase,
AutoIncrBase: cp.AutoIncrBase,
AutoRowIDBase: cp.AutoRowIDBase,
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions lightning/pkg/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() {
mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes()
mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes()

tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc()), s.tableInfo.Core)
require.NoError(s.T(), err)
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (s *tableRestoreSuite) TestEstimate() {
controller := gomock.NewController(s.T())
defer controller.Finish()
mockEncBuilder := mock.NewMockEncodingBuilder(controller)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
require.NoError(s.T(), err)

Expand Down
15 changes: 15 additions & 0 deletions lightning/tests/lightning_checkpoint/config-file.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[lightning]
index-concurrency = 1
table-concurrency = 1

[tikv-importer]
backend = "local"
parallel-import = true

[checkpoint]
enable = true
driver = "file"

[mydumper]
read-block-size = 1
filter = ['cppk_tsr.tbl1', 'cppk_tsr.tbl2', 'cppk_tsr.tbl7', 'cppk_tsr.tbl8', 'cppk_tsr.tbl9']
1 change: 1 addition & 0 deletions lightning/tests/lightning_checkpoint/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ table-concurrency = 1

[tikv-importer]
backend = "local"
parallel-import = true

[checkpoint]
enable = true
Expand Down
Loading

0 comments on commit ecca340

Please sign in to comment.