Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix insert err after import for AUTO_ID_CACHE=1 and SHARD_ROW_ID_BITS (#52712) #52722

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
name = "kv_test",
timeout = "short",
srcs = [
"allocator_test.go",
"base_test.go",
"session_internal_test.go",
"session_test.go",
Expand All @@ -57,7 +58,7 @@ go_test(
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 17,
shard_count = 18,
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
Expand Down
29 changes: 17 additions & 12 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,38 @@ import (
// panickingAllocator is an ID allocator which panics on all operations except Rebase
type panickingAllocator struct {
autoid.Allocator
base *int64
base atomic.Int64
ty autoid.AllocatorType
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
false,
&panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType},
)
// TODO: support save all bases in checkpoint.
func NewPanickingAllocators(sepAutoInc bool, base int64) autoid.Allocators {
allocs := make([]autoid.Allocator, 0, 3)
for _, t := range []autoid.AllocatorType{
autoid.RowIDAllocType,
autoid.AutoIncrementType,
autoid.AutoRandomType,
} {
pa := &panickingAllocator{ty: t}
pa.base.Store(base)
allocs = append(allocs, pa)
}
return autoid.NewAllocators(sepAutoInc, allocs...)
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
oldBase := alloc.base.Load()
if newBase <= oldBase {
break
}
if atomic.CompareAndSwapInt64(alloc.base, oldBase, newBase) {
if alloc.base.CompareAndSwap(oldBase, newBase) {
break
}
}
Expand All @@ -61,7 +66,7 @@ func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool

// Base implements the autoid.Allocator interface
func (alloc *panickingAllocator) Base() int64 {
return atomic.LoadInt64(alloc.base)
return alloc.base.Load()
}

func (alloc *panickingAllocator) GetType() autoid.AllocatorType {
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/lightning/backend/kv/allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/pingcap/tidb/meta/autoid"
"github.com/stretchr/testify/require"
)

func TestAllocator(t *testing.T) {
alloc := NewPanickingAllocators(true, 0)
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 123, false))
// cannot revert back
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 100, false))
require.NoError(t, alloc.Get(autoid.AutoIncrementType).Rebase(nil, 456, false))
require.NoError(t, alloc.Get(autoid.AutoRandomType).Rebase(nil, 789, false))

require.EqualValues(t, 123, alloc.Get(autoid.RowIDAllocType).Base())
require.EqualValues(t, 456, alloc.Get(autoid.AutoIncrementType).Base())
require.EqualValues(t, 789, alloc.Get(autoid.AutoRandomType).Base())
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLogKVConvertFailed(t *testing.T) {
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
var tbl table.Table
tbl, err = tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err = tables.TableFromMeta(NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

var baseKVEncoder *BaseKVEncoder
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEncode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestDecode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
decoder, err := lkv.NewTableKVDecoder(tbl, "`test`.`c1`", &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDecodeIndex(t *testing.T) {
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

rows := []types.Datum{
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestEncodeTimestamp(t *testing.T) {

func TestEncodeDoubleAutoIncrement(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id double not null auto_increment, unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {
},
} {
tblInfo := mockTableInfo(t, testTblInfo.CreateStmt)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {

func TestEncodeExpressionColumn(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id varchar(40) not null DEFAULT uuid(), unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -503,7 +503,7 @@ func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo {

func TestDefaultAutoRandoms(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id bigint unsigned NOT NULL auto_random primary key clustered, a varchar(100));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestDefaultAutoRandoms(t *testing.T) {

func TestShardRowId(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (s varchar(16)) shard_row_id_bits = 3;")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -703,7 +703,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite {
tableInfo.State = model.StatePublic

// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tableInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0), tableInfo)
require.NoError(b, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBuildDupTask(t *testing.T) {
info, err := ddl.MockTableInfo(mock.NewContext(), node[0].(*ast.CreateTableStmt), 1)
require.NoError(t, err)
info.State = model.StatePublic
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), info)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(info.SepAutoInc(), 0), info)
require.NoError(t, err)

// Test build duplicate detecting task.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
cols = append(cols, col)
}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
backendObj := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L()))
return &mysqlSuite{
Expand Down Expand Up @@ -242,7 +242,7 @@ func testStrictMode(t *testing.T) {
ft.SetCharset(charset.CharsetASCII)
col1 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("s1"), State: model.StatePublic, Offset: 1, FieldType: ft}
tblInfo := &model.TableInfo{ID: 1, Columns: []*model.ColumnInfo{col0, col1}, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
if err != nil {
return 0.0, false, errors.Trace(err)
}
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)
if err != nil {
return 0.0, false, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id,
// the id allocators are separated in this case.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
Expand Down
24 changes: 14 additions & 10 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTableImporter(
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
Expand Down Expand Up @@ -898,26 +898,30 @@ func (tr *TableImporter) postProcess(
if cp.Status < checkpoints.CheckpointStatusAlteredAutoInc {
tblInfo := tr.tableInfo.Core
var err error
// TODO why we have to rebase id for tidb backend??? remove it later.
if tblInfo.ContainsAutoRandomBits() {
ft := &common.GetAutoRandomColumn(tblInfo).FieldType
shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits)
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)

if err == nil && isLocalBackend(rc.cfg) {
if isLocalBackend(rc.cfg) {
// for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid,
// especially when a table has auto_increment non-clustered PK, it will use both allocators.
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.RowIDAllocType: tr.alloc.Get(autoid.RowIDAllocType).Base(),
autoid.AutoIncrementType: tr.alloc.Get(autoid.AutoIncrementType).Base(),
}, tr, tr.dbInfo.ID, tr.tableInfo.Core)
} else {
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)
}
}
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() {
mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes()
mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes()

tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), s.tableInfo.Core)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core)
require.NoError(s.T(), err)
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
Expand Down Expand Up @@ -1468,7 +1468,7 @@ func (s *tableRestoreSuite) TestEstimate() {
controller := gomock.NewController(s.T())
defer controller.Finish()
mockEncBuilder := mock.NewMockEncodingBuilder(controller)
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
require.NoError(s.T(), err)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
1778961125641936898 is 0001100010110000001000111011011111101011000111011110000000000010
bigger than the max increment part of sharded auto row id.
*/
CREATE TABLE nonclustered_cache1_shard_autorowid (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
v int,
PRIMARY KEY (id) NONCLUSTERED
) AUTO_ID_CACHE=1 SHARD_ROW_ID_BITS=4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1778961125641936898,1
1778961125641936899,2
1778961125641936900,3
2 changes: 1 addition & 1 deletion br/tests/lightning_csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function run_with() {
run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""'
check_not_contains 'id:'

for table in clustered nonclustered clustered_cache1 nonclustered_cache1; do
for table in clustered nonclustered clustered_cache1 nonclustered_cache1 nonclustered_cache1_shard_autorowid; do
run_sql "select count(*) from auto_incr_id.$table"
check_contains 'count(*): 3'
# insert should work
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000;
create table specific_auto_inc (
a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */,
b int unique auto_increment) auto_increment=80000;
4 changes: 2 additions & 2 deletions br/tests/lightning_tool_1472/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ run_lightning
run_sql 'insert into EE1472.pk values ();'
run_sql 'select count(a), max(a) from EE1472.pk;'
check_contains 'count(a): 3'
check_contains 'max(a): 6'
check_contains 'max(a): 5'

run_sql 'insert into EE1472.notpk (a) values (3333);'
run_sql 'select b from EE1472.notpk where a = 3333;'
check_contains 'b: 11'
check_contains 'b: 10'
2 changes: 1 addition & 1 deletion disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gT
}

func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMeta, error) {
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ImportScheduler struct {
func (s *ImportScheduler) InitSubtaskExecEnv(ctx context.Context) error {
logutil.BgLogger().Info("InitSubtaskExecEnv", zap.Any("taskMeta", s.taskMeta))

idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(s.taskMeta.Plan.TableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, s.taskMeta.Plan.TableInfo)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) {

// NewTableImporter creates a new table importer.
func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableImporter, err error) {
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta())
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name)
Expand Down