Skip to content

Commit

Permalink
lightning: fix id too large after parallel import (#57398) (#57928)
Browse files Browse the repository at this point in the history
close #56814
  • Loading branch information
ti-chi-bot authored Dec 6, 2024
1 parent 9f4dec9 commit f8d5c54
Show file tree
Hide file tree
Showing 32 changed files with 568 additions and 311 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ br_bins:
@rm tmp_parser.go

data_parsers: tools/bin/vfsgendev br/pkg/lightning/mydump/parser_generated.go br_web
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOPATH)/src" br/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
PATH="$(GOPATH)/bin":"$(PATH)":"$(TOOLS)" protoc -I. -I"$(GOMODCACHE)" br/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto --gogofaster_out=.
tools/bin/vfsgendev -source='"github.com/pingcap/tidb/br/pkg/lightning/web".Res' && mv res_vfsdata.go br/pkg/lightning/web/

build_dumpling:
Expand Down
1 change: 1 addition & 0 deletions Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

PROJECT=tidb
GOPATH ?= $(shell go env GOPATH)
GOMODCACHE ?= $(shell go env GOMODCACHE)
P=8

# Ensure GOPATH is set before running build process.
Expand Down
26 changes: 17 additions & 9 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,28 @@ type panickingAllocator struct {
ty autoid.AllocatorType
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// NewPanickingAllocators creates a PanickingAllocator with default base values.
func NewPanickingAllocators(sepAutoInc bool) autoid.Allocators {
return NewPanickingAllocatorsWithBase(sepAutoInc, 0, 0, 0)
}

// NewPanickingAllocatorsWithBase creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
// TODO: support save all bases in checkpoint.
func NewPanickingAllocators(sepAutoInc bool, base int64) autoid.Allocators {
func NewPanickingAllocatorsWithBase(sepAutoInc bool, autoRandBase, autoIncrBase,
autoRowIDBase int64) autoid.Allocators {
allocs := make([]autoid.Allocator, 0, 3)
for _, t := range []autoid.AllocatorType{
autoid.RowIDAllocType,
autoid.AutoIncrementType,
autoid.AutoRandomType,
for _, t := range []struct {
Type autoid.AllocatorType
Base int64
}{
{Type: autoid.AutoRandomType, Base: autoRandBase},
{Type: autoid.AutoIncrementType, Base: autoIncrBase},
{Type: autoid.RowIDAllocType, Base: autoRowIDBase},
} {
pa := &panickingAllocator{ty: t}
pa.base.Store(base)
pa := &panickingAllocator{ty: t.Type}
pa.base.Store(t.Base)
allocs = append(allocs, pa)
}
return autoid.NewAllocators(sepAutoInc, allocs...)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestAllocator(t *testing.T) {
alloc := NewPanickingAllocators(true, 0)
alloc := NewPanickingAllocators(true)
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 123, false))
// cannot revert back
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 100, false))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLogKVConvertFailed(t *testing.T) {
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
var tbl table.Table
tbl, err = tables.TableFromMeta(NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err = tables.TableFromMeta(NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

var baseKVEncoder *BaseKVEncoder
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/kv/kv2sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestIterRawIndexKeysClusteredPK(t *testing.T) {
require.NoError(t, err)
info.State = model.StatePublic
require.True(t, info.IsCommonHandle)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(info.SepAutoInc(), 0), info)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(info.SepAutoInc()), info)
require.NoError(t, err)

sessionOpts := &encode.SessionOptions{
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestIterRawIndexKeysIntPK(t *testing.T) {
require.NoError(t, err)
info.State = model.StatePublic
require.True(t, info.PKIsHandle)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(info.SepAutoInc(), 0), info)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(info.SepAutoInc()), info)
require.NoError(t, err)

sessionOpts := &encode.SessionOptions{
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEncode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestDecode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)
decoder, err := lkv.NewTableKVDecoder(tbl, "`test`.`c1`", &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDecodeIndex(t *testing.T) {
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

rows := []types.Datum{
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestEncodeTimestamp(t *testing.T) {

func TestEncodeDoubleAutoIncrement(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id double not null auto_increment, unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {
},
} {
tblInfo := mockTableInfo(t, testTblInfo.CreateStmt)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {

func TestEncodeExpressionColumn(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id varchar(40) not null DEFAULT uuid(), unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -503,7 +503,7 @@ func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo {

func TestDefaultAutoRandoms(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id bigint unsigned NOT NULL auto_random primary key clustered, a varchar(100));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestDefaultAutoRandoms(t *testing.T) {

func TestShardRowId(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (s varchar(16)) shard_row_id_bits = 3;")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -703,7 +703,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite {
tableInfo.State = model.StatePublic

// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0), tableInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tableInfo.SepAutoInc()), tableInfo)
require.NoError(b, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBuildDupTask(t *testing.T) {
info, err := ddl.MockTableInfo(mock.NewContext(), node[0].(*ast.CreateTableStmt), 1)
require.NoError(t, err)
info.State = model.StatePublic
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(info.SepAutoInc(), 0), info)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(info.SepAutoInc()), info)
require.NoError(t, err)

// Test build duplicate detecting task.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
cols = append(cols, col)
}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)
cfg := config.NewConfig()
cfg.Conflict.Strategy = config.ReplaceOnDup
Expand Down Expand Up @@ -286,7 +286,7 @@ func testStrictMode(t *testing.T) {
ft.SetCharset(charset.CharsetASCII)
col1 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("s1"), State: model.StatePublic, Offset: 1, FieldType: ft}
tblInfo := &model.TableInfo{ID: 1, Columns: []*model.ColumnInfo{col0, col1}, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc()), tblInfo)
require.NoError(t, err)

ctx := context.Background()
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/lightning/checkpoints/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_library(
"//pkg/parser/model",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/mathutil",
"//pkg/util/sqlexec",
"@com_github_joho_sqltocsv//:sqltocsv",
"@com_github_pingcap_errors//:errors",
Expand All @@ -42,7 +41,7 @@ go_test(
embed = [":checkpoints"],
flaky = True,
race = "on",
shard_count = 23,
shard_count = 24,
deps = [
"//br/pkg/lightning/checkpoints/checkpointspb",
"//br/pkg/lightning/config",
Expand Down
Loading

0 comments on commit f8d5c54

Please sign in to comment.