Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: change MinRowID because ADD UNIQUE INDEX may be smaller #51955

Merged
merged 7 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/topsql/stmtstats",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -201,7 +202,7 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64
kvPairs := e.SessionCtx.TakeKvPairs()
for i := 0; i < len(kvPairs.Pairs); i++ {
var encoded [9]byte // The max length of encoded int64 is 9.
kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
kvPairs.Pairs[i].RowID = codec.EncodeComparableVarint(encoded[:0], rowID)
}
e.recordCache = record[:0]
return kvPairs, nil
Expand Down
9 changes: 0 additions & 9 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,6 @@ type dupDBIter struct {
err error
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, common.ZeroRowID)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Error() error {
if d.err != nil {
return d.err
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 28,
shard_count = 29,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand All @@ -127,6 +127,8 @@ go_test(
"//pkg/store/driver/error",
"//pkg/store/mockstore",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/dbutil",
"//pkg/util/mock",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Expand Down
7 changes: 2 additions & 5 deletions br/pkg/lightning/common/key_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package common

import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/codec"
)
Expand Down Expand Up @@ -114,8 +112,7 @@ func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int {

var _ KeyAdapter = DupDetectKeyAdapter{}

// static vars for rowID
var (
MinRowID = EncodeIntRowID(math.MinInt64)
ZeroRowID = EncodeIntRowID(0)
// MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode().
MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0}
)
55 changes: 53 additions & 2 deletions br/pkg/lightning/common/key_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"math"
"sort"
"testing"
"time"
"unsafe"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
)

Expand All @@ -34,8 +38,9 @@ func randBytes(n int) []byte {
func TestNoopKeyAdapter(t *testing.T) {
keyAdapter := NoopKeyAdapter{}
key := randBytes(32)
require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
rowID := randBytes(8)
require.Len(t, key, keyAdapter.EncodedLen(key, rowID))
encodedKey := keyAdapter.Encode(nil, key, rowID)
require.Equal(t, key, encodedKey)

decodedKey, err := keyAdapter.Decode(nil, encodedKey)
Expand Down Expand Up @@ -159,3 +164,49 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) {
require.Equal(t, key, buf2[4:])
}
}

func TestMinRowID(t *testing.T) {
keyApapter := DupDetectKeyAdapter{}
key := []byte("key")
val := []byte("val")
shouldBeMin := keyApapter.Encode(key, val, MinRowID)

rowIDs := make([][]byte, 0, 20)

// DDL

rowIDs = append(rowIDs, kv.IntHandle(math.MinInt64).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(-1).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(0).Encoded())
rowIDs = append(rowIDs, kv.IntHandle(math.MaxInt64).Encoded())
handleData := []types.Datum{
types.NewIntDatum(math.MinInt64),
types.NewIntDatum(-1),
types.NewIntDatum(0),
types.NewIntDatum(math.MaxInt64),
types.NewBytesDatum(make([]byte, 1)),
types.NewBytesDatum(make([]byte, 7)),
types.NewBytesDatum(make([]byte, 8)),
types.NewBytesDatum(make([]byte, 9)),
types.NewBytesDatum(make([]byte, 100)),
}
for _, d := range handleData {
encodedKey, err := codec.EncodeKey(time.Local, nil, d)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(encodedKey)
require.NoError(t, err)
rowIDs = append(rowIDs, ch.Encoded())
}

// lightning, IMPORT INTO, ...

numRowIDs := []int64{math.MinInt64, -1, 0, math.MaxInt64}
for _, id := range numRowIDs {
rowIDs = append(rowIDs, codec.EncodeComparableVarint(nil, id))
}

for _, id := range rowIDs {
bs := keyApapter.Encode(key, val, id)
require.True(t, bytes.Compare(bs, shouldBeMin) >= 0)
}
}
13 changes: 7 additions & 6 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,16 @@ type KvPair struct {
Key []byte
// Val is the value of the KV pair
Val []byte
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It's
// often set to the file offset of the KvPair in the source file or the record
// handle.
// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has
// two sources:
//
// When the KvPair is generated from ADD INDEX, the RowID is the encoded handle.
//
// Otherwise, the RowID is related to the row number in the source files, and
// encode the number with `codec.EncodeComparableVarint`.
RowID []byte
}

// EncodeIntRowIDToBuf encodes an int64 row id to a buffer.
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint

// EncodeIntRowID encodes an int64 row id.
func EncodeIntRowID(rowID int64) []byte {
return codec.EncodeComparableVarint(nil, rowID)
Expand Down
12 changes: 12 additions & 0 deletions tests/realtikvtest/addindextest/add_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,15 @@ func TestIssue51162(t *testing.T) {
tk.MustExec("alter table tl add index idx_16(`col_48`,(cast(`col_45` as signed array)),`col_46`(5));")
tk.MustExec("admin check table tl")
}

func TestAddUKWithSmallIntHandles(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists small;")
tk.MustExec("create database small;")
tk.MustExec("use small;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=1;`)
tk.MustExec("create table t (a bigint, b int, primary key (a) clustered)")
tk.MustExec("insert into t values (-9223372036854775808, 1),(-9223372036854775807, 1)")
tk.MustContainErrMsg("alter table t add unique index uk(b)", "Duplicate entry '1' for key 't.uk'")
}
Loading