Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: integrate global sort(without merge-sort) part 1 #46998

Merged
merged 22 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher > disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, int64(engineID))
engine := local.Engine{UUID: eID}
err := engine.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

func makeTag(tableName string, engineID int32) string {
func makeTag(tableName string, engineID int64) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

Expand All @@ -48,7 +48,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger
}

// MakeUUID generates a UUID for the engine and a tag for the engine.
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
func MakeUUID(tableName string, engineID int64) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
return tag, engineUUID
Expand Down Expand Up @@ -229,7 +229,7 @@ func MakeEngineManager(ab Backend) EngineManager {
// OpenEngine opens an engine with the given table name and engine ID.
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig,
tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
Expand Down Expand Up @@ -298,7 +298,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
// resuming from a checkpoint.
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig,
tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,79 @@ func GetMaxOverlapping(points []Endpoint) int {
}
return int(maxWeight)
}

// SortedKVMeta is the meta of sorted kv.
type SortedKVMeta struct {
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
}

// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary.
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
meta := &SortedKVMeta{
MinKey: summary.Min.Clone(),
MaxKey: summary.Max.Clone(),
TotalKVSize: summary.TotalSize,
}
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
meta.DataFiles = append(meta.DataFiles, filename[0])
meta.StatFiles = append(meta.StatFiles, filename[1])
}
}
return meta
}

// Merge merges the other SortedKVMeta into this one.
func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
m.MinKey = NotNilMin(m.MinKey, other.MinKey)
m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey)
m.TotalKVSize += other.TotalKVSize

m.DataFiles = append(m.DataFiles, other.DataFiles...)
m.StatFiles = append(m.StatFiles, other.StatFiles...)
}

// MergeSummary merges the WriterSummary into this SortedKVMeta.
func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) {
m.MinKey = NotNilMin(m.MinKey, summary.Min)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
m.MaxKey = NotNilMax(m.MaxKey, summary.Max)
m.TotalKVSize += summary.TotalSize
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
m.DataFiles = append(m.DataFiles, filename[0])
m.StatFiles = append(m.StatFiles, filename[1])
}
}
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NotNilMin returns the smallest of a and b, ignoring nil values.
// NotNilMin returns the smallest of a and b.

func NotNilMin(a, b []byte) []byte {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from ddl/backfilling_dispatcher.go

if len(a) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if both a and b nil?it will return b(nil)?

Copy link
Contributor Author

@D3Hunter D3Hunter Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's moved from add-index, didn't change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Benjamin2037 Yes, returning a nil value is expected.

return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// NotNilMax returns the largest of a and b, ignoring nil values.
func NotNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
39 changes: 39 additions & 0 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -427,3 +430,39 @@ func (w *Writer) createStorageWriter(ctx context.Context) (
}
return dataPath, statPath, dataWriter, statsWriter, nil
}

// EngineWriter implements backend.EngineWriter interface.
type EngineWriter struct {
w *Writer
}

// NewEngineWriter creates a new EngineWriter.
func NewEngineWriter(w *Writer) *EngineWriter {
return &EngineWriter{w: w}
}

// AppendRows implements backend.EngineWriter interface.
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
}
for _, item := range kvs {
err := e.w.WriteRow(ctx, item.Key, item.Val, nil)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}
return nil
}

// IsSynced implements backend.EngineWriter interface.
func (e *EngineWriter) IsSynced() bool {
// only used when saving checkpoint
return true
}

// Close implements backend.EngineWriter interface.
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, e.w.Close(ctx)
}
20 changes: 16 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ func (local *Backend) ImportEngine(
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
Expand Down Expand Up @@ -1549,7 +1549,7 @@ func (local *Backend) ImportEngine(

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
}

// expose these variables to unit test.
Expand Down Expand Up @@ -1689,6 +1689,18 @@ func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 {
return e.importedKVCount.Load()
}

// GetExternalEngineKVStatistics returns kv statistics of some engine.
func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) (
totalKVSize int64, totalKVCount int64) {
v, ok := local.externalEngine[engineUUID]
if !ok {
// we get it after import, but before clean up, so this should not happen
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// todo: return error
return 0, 0
}
return v.KVStatistics()
}

// ResetEngine reset the engine and reclaim the space.
func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
// the only way to reset the engine + reclaim the space is to delete and reopen it 🤷
Expand Down Expand Up @@ -1927,8 +1939,8 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (
return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

// return region split size, region split keys, error
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
// GetRegionSplitSizeKeys return region split size, region split keys, error
func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
regionSplitSize int64, regionSplitKeys int64, err error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
}
return 0, 0, errors.New("invalid connection")
}
splitSize, splitKeys, err := getRegionSplitSizeKeys(ctx, cli, nil)
splitSize, splitKeys, err := GetRegionSplitSizeKeys(ctx, cli, nil)
require.NoError(t, err)
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error
}
for tableName, engineIDs := range targetTables {
for _, engineID := range engineIDs {
_, eID := backend.MakeUUID(tableName, engineID)
_, eID := backend.MakeUUID(tableName, int64(engineID))
file := local.Engine{UUID: eID}
err := file.Exist(dir)
if err != nil {
Expand Down
44 changes: 9 additions & 35 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,14 @@ func generateMergeSortPlan(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
m := &BackfillSubTaskMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
SortedKVMeta: external.SortedKVMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
},
RangeSplitKeys: rangeSplitKeys,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -386,8 +388,8 @@ func getSummaryFromLastStep(
}
// Skip empty subtask.MinKey/MaxKey because it means
// no records need to be written in this subtask.
minKey = notNilMin(minKey, subtask.MinKey)
maxKey = notNilMax(maxKey, subtask.MaxKey)
minKey = external.NotNilMin(minKey, subtask.MinKey)
maxKey = external.NotNilMax(maxKey, subtask.MaxKey)
totalKVSize += subtask.TotalKVSize

allDataFiles = append(allDataFiles, subtask.DataFiles...)
Expand All @@ -409,31 +411,3 @@ func redactCloudStorageURI(
}
gTask.Meta = metaBytes
}

// notNilMin returns the smaller of a and b, ignoring nil values.
func notNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// notNilMax returns the larger of a and b, ignoring nil values.
func notNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
9 changes: 3 additions & 6 deletions ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
Expand All @@ -44,12 +45,8 @@ type BackfillSubTaskMeta struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`

DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
external.SortedKVMeta `json:",inline"`
}

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if local == nil {
return errors.Errorf("local backend not found")
}
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID))
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID)
err = local.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: m.cloudStoreURI,
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ go_test(
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 10,
shard_count = 11,
deps = [
"//disttask/framework/mock",
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
Expand All @@ -52,5 +53,6 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
],
)
Loading