Skip to content

Commit

Permalink
ddl: merge continuous key ranges in FLASHBACK CLUSTER job args (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and hawkingrei committed Aug 1, 2024
1 parent 394e405 commit 0f5139a
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 49 deletions.
99 changes: 83 additions & 16 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []i
return flashbackIDs
}

// GetTableDataKeyRanges get keyRanges by `flashbackIDs`.
// getTableDataKeyRanges get keyRanges by `flashbackIDs`.
// This func will return all flashback table data key ranges.
func GetTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
func getTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
var keyRanges []kv.KeyRange

nonFlashbackTableIDs = append(nonFlashbackTableIDs, -1)
Expand All @@ -349,10 +349,52 @@ func GetTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
return keyRanges
}

// GetFlashbackKeyRanges get keyRanges for flashback cluster.
type keyRangeMayExclude struct {
r kv.KeyRange
exclude bool
}

// appendContinuousKeyRanges merges not exclude continuous key ranges and appends
// to given []kv.KeyRange, assuming the gap between key ranges has no data.
//
// Precondition: schemaKeyRanges is sorted by start key. schemaKeyRanges are
// non-overlapping.
func appendContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
var (
continuousStart, continuousEnd kv.Key
)

for _, r := range schemaKeyRanges {
if r.exclude {
if continuousStart != nil {
result = append(result, kv.KeyRange{
StartKey: continuousStart,
EndKey: continuousEnd,
})
continuousStart = nil
}
continue
}

if continuousStart == nil {
continuousStart = r.r.StartKey
}
continuousEnd = r.r.EndKey
}

if continuousStart != nil {
result = append(result, kv.KeyRange{
StartKey: continuousStart,
EndKey: continuousEnd,
})
}
return result
}

// getFlashbackKeyRanges get keyRanges for flashback cluster.
// It contains all non system table key ranges and meta data key ranges.
// The time complexity is O(nlogn).
func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.KeyRange, error) {
func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashbackTS uint64) ([]kv.KeyRange, error) {
is := sess.GetDomainInfoSchema().(infoschema.InfoSchema)
schemas := is.AllSchemas()

Expand All @@ -367,27 +409,52 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke
}

schemaIDs := make(map[int64]struct{})
excludeSchemaIDs := make(map[int64]struct{})
for _, schema := range schemas {
if !filter.IsSystemSchema(schema.Name.L) {
if filter.IsSystemSchema(schema.Name.L) {
excludeSchemaIDs[schema.ID] = struct{}{}
} else {
schemaIDs[schema.ID] = struct{}{}
}
}
for _, schema := range snapshotSchemas {
if !filter.IsSystemSchema(schema.Name.L) {
if filter.IsSystemSchema(schema.Name.L) {
excludeSchemaIDs[schema.ID] = struct{}{}
} else {
schemaIDs[schema.ID] = struct{}{}
}
}

// The meta data key ranges.
schemaKeyRanges := make([]keyRangeMayExclude, 0, len(schemaIDs)+len(excludeSchemaIDs))
for schemaID := range schemaIDs {
metaStartKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID))
metaEndKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID + 1))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
schemaKeyRanges = append(schemaKeyRanges, keyRangeMayExclude{
r: kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
},
exclude: false,
})
}
for schemaID := range excludeSchemaIDs {
metaStartKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID))
metaEndKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID + 1))
schemaKeyRanges = append(schemaKeyRanges, keyRangeMayExclude{
r: kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
},
exclude: true,
})
}

slices.SortFunc(schemaKeyRanges, func(a, b keyRangeMayExclude) int {
return bytes.Compare(a.r.StartKey, b.r.StartKey)
})

keyRanges = appendContinuousKeyRanges(keyRanges, schemaKeyRanges)

startKey := tablecodec.EncodeMetaKeyPrefix([]byte("DBs"))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: startKey,
Expand All @@ -396,11 +463,11 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke

var nonFlashbackTableIDs []int64
for _, db := range schemas {
tblInfos, err := is.SchemaTableInfos(context.Background(), db.Name)
if err != nil {
return nil, errors.Trace(err)
tbls, err2 := is.SchemaTableInfos(ctx, db.Name)
if err2 != nil {
return nil, errors.Trace(err2)
}
for _, table := range tblInfos {
for _, table := range tbls {
if !table.IsBaseTable() || table.ID > meta.MaxGlobalID {
continue
}
Expand All @@ -413,7 +480,7 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke
}
}

return append(keyRanges, GetTableDataKeyRanges(nonFlashbackTableIDs)...), nil
return append(keyRanges, getTableDataKeyRanges(nonFlashbackTableIDs)...), nil
}

// SendPrepareFlashbackToVersionRPC prepares regions for flashback, the purpose is to put region into flashback state which region stop write
Expand Down Expand Up @@ -712,7 +779,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
keyRanges, err = GetFlashbackKeyRanges(sess, flashbackTS)
keyRanges, err = getFlashbackKeyRanges(w.ctx, sess, flashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -38,34 +35,6 @@ import (
"github.com/tikv/client-go/v2/oracle"
)

func TestGetTableDataKeyRanges(t *testing.T) {
// case 1, empty flashbackIDs
keyRanges := ddl.GetTableDataKeyRanges([]int64{})
require.Len(t, keyRanges, 1)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 2, insert a execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3})
require.Len(t, keyRanges, 2)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 3, insert some execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3, 5, 9})
require.Len(t, keyRanges, 4)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(5))
require.Equal(t, keyRanges[2].StartKey, tablecodec.EncodeTablePrefix(6))
require.Equal(t, keyRanges[2].EndKey, tablecodec.EncodeTablePrefix(9))
require.Equal(t, keyRanges[3].StartKey, tablecodec.EncodeTablePrefix(10))
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
Expand Down
144 changes: 144 additions & 0 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/mock"
Expand Down Expand Up @@ -294,3 +295,146 @@ func TestCheckDuplicateConstraint(t *testing.T) {
err = checkDuplicateConstraint(constrNames, "u1", ast.ConstraintUniq)
require.EqualError(t, err, "[ddl:1061]Duplicate key name 'u1'")
}

func TestGetTableDataKeyRanges(t *testing.T) {
// case 1, empty flashbackIDs
keyRanges := getTableDataKeyRanges([]int64{})
require.Len(t, keyRanges, 1)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 2, insert a execluded table ID
keyRanges = getTableDataKeyRanges([]int64{3})
require.Len(t, keyRanges, 2)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 3, insert some execluded table ID
keyRanges = getTableDataKeyRanges([]int64{3, 5, 9})
require.Len(t, keyRanges, 4)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(5))
require.Equal(t, keyRanges[2].StartKey, tablecodec.EncodeTablePrefix(6))
require.Equal(t, keyRanges[2].EndKey, tablecodec.EncodeTablePrefix(9))
require.Equal(t, keyRanges[3].StartKey, tablecodec.EncodeTablePrefix(10))
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestAppendContinuousKeyRanges(t *testing.T) {
cases := []struct {
input []keyRangeMayExclude
expect []kv.KeyRange
}{
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
},
[]kv.KeyRange{},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{2}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{4}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: false,
},
},
[]kv.KeyRange{
{StartKey: []byte{1}, EndKey: []byte{2}},
{StartKey: []byte{5}, EndKey: []byte{6}},
},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{5}, EndKey: []byte{6}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: true,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{2}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: true,
},
},
[]kv.KeyRange{{StartKey: []byte{3}, EndKey: []byte{4}}},
},
}

for i, ca := range cases {
ranges := appendContinuousKeyRanges([]kv.KeyRange{}, ca.input)
require.Equal(t, ca.expect, ranges, "case %d", i)
}
}
Loading

0 comments on commit 0f5139a

Please sign in to comment.