Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix panic when nextKey twice (#40959) #41017

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
// the table when table is created.
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
needSplit = true
})
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
Expand Down Expand Up @@ -1856,7 +1859,8 @@ func nextKey(key []byte) []byte {

// in tikv <= 4.x, tikv will truncate the row key, so we should fetch the next valid row key
// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
if tablecodec.IsRecordKey(key) {
// we only do this for IntHandle, which is checked by length
if tablecodec.IsRecordKey(key) && len(key) == tablecodec.RecordRowKeyLen {
tableID, handle, _ := tablecodec.DecodeRecordKey(key)
nextHandle := handle.Next()
// int handle overflow, use the next table prefix as nextKey
Expand All @@ -1866,7 +1870,7 @@ func nextKey(key []byte) []byte {
return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
}

// if key is an index, directly append a 0x00 to the key.
// for index key and CommonHandle, directly append a 0x00 to the key.
res := make([]byte, 0, len(key)+1)
res = append(res, key...)
res = append(res, 0)
Expand Down
15 changes: 13 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,21 @@ func TestNextKey(t *testing.T) {
require.NoError(t, err)
nextHdl, err := tidbkv.NewCommonHandle(nextKeyBytes)
require.NoError(t, err)
expectNextKey := []byte(tablecodec.EncodeRowKeyWithHandle(1, nextHdl))
require.Equal(t, expectNextKey, nextKey(key))
nextValidKey := []byte(tablecodec.EncodeRowKeyWithHandle(1, nextHdl))
// nextKey may return a key that can't be decoded, but it must not be larger than the valid next key.
require.True(t, bytes.Compare(nextKey(key), nextValidKey) <= 0, "datums: %v", datums)
}

// a special case that when len(string datum) % 8 == 7, nextKey twice should not panic.
keyBytes, err := codec.EncodeKey(stmtCtx, nil, types.NewStringDatum("1234567"))
require.NoError(t, err)
h, err := tidbkv.NewCommonHandle(keyBytes)
require.NoError(t, err)
key = tablecodec.EncodeRowKeyWithHandle(1, h)
nextOnce := nextKey(key)
// should not panic
_ = nextKey(nextOnce)

// dIAAAAAAAAD/PV9pgAAAAAD/AAABA4AAAAD/AAAAAQOAAAD/AAAAAAEAAAD8
// a index key with: table: 61, index: 1, int64: 1, int64: 1
a := []byte{116, 128, 0, 0, 0, 0, 0, 0, 255, 61, 95, 105, 128, 0, 0, 0, 0, 255, 0, 0, 1, 3, 128, 0, 0, 0, 255, 0, 0, 0, 1, 3, 128, 0, 0, 255, 0, 0, 0, 0, 1, 0, 0, 0, 252}
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -381,7 +382,14 @@ func fetchTableRegionSizeStats(ctx context.Context, db *sql.DB, tableID int64) (
return stats, errors.Trace(err)
}

func (local *local) BatchSplitRegions(ctx context.Context, region *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error) {
func (local *local) BatchSplitRegions(
ctx context.Context,
region *split.RegionInfo,
keys [][]byte,
) (*split.RegionInfo, []*split.RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
})
region, newRegions, err := local.splitCli.BatchSplitRegionsWithOrigin(ctx, region, keys)
if err != nil {
return nil, nil, errors.Annotatef(err, "batch split regions failed")
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a-schema.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create table a (c int);
create table a (c VARCHAR(20) PRIMARY KEY);
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.1.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (1);
insert into a values ('0000001');
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.2.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (2);
insert into a values ('0000002');
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.3.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (3),(4);
insert into a values ('0000003'),('0000004');
5 changes: 3 additions & 2 deletions br/tests/lightning_local_backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning


# First, verify that inject with not leader error is fine.
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/lightning/backend/local/failToSplit=2*return("")'
rm -f "$TEST_DIR/lightning-local.log"
run_sql 'DROP DATABASE IF EXISTS cpeng;'
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml"
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" -L debug
grep -Eq "split regions.*retryable error" "$TEST_DIR/lightning-local.log"

# Check that everything is correctly imported
run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
Expand Down
2 changes: 2 additions & 0 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Handle interface {
// IntValue returns the int64 value if IsInt is true, it panics if IsInt returns false.
IntValue() int64
// Next returns the minimum handle that is greater than this handle.
// The returned handle is not guaranteed to be able to decode.
Next() Handle
// Equal returns if the handle equals to another handle, it panics if the types are different.
Equal(h Handle) bool
Expand Down Expand Up @@ -279,6 +280,7 @@ func (ch *CommonHandle) IntValue() int64 {
}

// Next implements the Handle interface.
// Note that the returned encoded field is not guaranteed to be able to decode.
func (ch *CommonHandle) Next() Handle {
return &CommonHandle{
encoded: Key(ch.encoded).PrefixNext(),
Expand Down