Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: integrate global sort(without merge-sort) part 1 #46998

Merged
merged 22 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher > disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, int64(engineID))
engine := local.Engine{UUID: eID}
err := engine.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

func makeTag(tableName string, engineID int32) string {
func makeTag(tableName string, engineID int64) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

Expand All @@ -48,7 +48,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger
}

// MakeUUID generates a UUID for the engine and a tag for the engine.
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
func MakeUUID(tableName string, engineID int64) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
return tag, engineUUID
Expand Down Expand Up @@ -229,7 +229,7 @@ func MakeEngineManager(ab Backend) EngineManager {
// OpenEngine opens an engine with the given table name and engine ID.
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig,
tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
Expand Down Expand Up @@ -298,7 +298,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
// resuming from a checkpoint.
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig,
tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
tag, engineUUID := MakeUUID(tableName, int64(engineID))
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
Expand Down
68 changes: 68 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,71 @@ func GetMaxOverlapping(points []Endpoint) int64 {
}
return maxWeight
}

// SortedKVMeta is the meta of sorted kv.
type SortedKVMeta struct {
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
}

// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary.
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
meta := &SortedKVMeta{
MinKey: summary.Min.Clone(),
MaxKey: summary.Max.Clone(),
TotalKVSize: summary.TotalSize,
}
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
meta.DataFiles = append(meta.DataFiles, filename[0])
meta.StatFiles = append(meta.StatFiles, filename[1])
}
}
return meta
}

// Merge merges the other SortedKVMeta into this one.
func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
m.MinKey = NotNilMin(m.MinKey, other.MinKey)
m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey)
m.TotalKVSize += other.TotalKVSize

m.DataFiles = append(m.DataFiles, other.DataFiles...)
m.StatFiles = append(m.StatFiles, other.StatFiles...)
}

// MergeSummary merges the WriterSummary into this SortedKVMeta.
func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) {
m.Merge(NewSortedKVMeta(summary))
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NotNilMin returns the smallest of a and b, ignoring nil values.
// NotNilMin returns the smallest of a and b.

func NotNilMin(a, b []byte) []byte {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from ddl/backfilling_dispatcher.go

if len(a) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if both a and b nil?it will return b(nil)?

Copy link
Contributor Author

@D3Hunter D3Hunter Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's moved from add-index, didn't change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Benjamin2037 Yes, returning a nil value is expected.

return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// NotNilMax returns the largest of a and b, ignoring nil values.
func NotNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
39 changes: 39 additions & 0 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -442,3 +445,39 @@ func (w *Writer) createStorageWriter(ctx context.Context) (
}
return dataPath, statPath, dataWriter, statsWriter, nil
}

// EngineWriter implements backend.EngineWriter interface.
type EngineWriter struct {
w *Writer
}

// NewEngineWriter creates a new EngineWriter.
func NewEngineWriter(w *Writer) *EngineWriter {
return &EngineWriter{w: w}
}

// AppendRows implements backend.EngineWriter interface.
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
}
for _, item := range kvs {
err := e.w.WriteRow(ctx, item.Key, item.Val, nil)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}
return nil
}

// IsSynced implements backend.EngineWriter interface.
func (e *EngineWriter) IsSynced() bool {
// only used when saving checkpoint
return true
}

// Close implements backend.EngineWriter interface.
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return nil, e.w.Close(ctx)
}
18 changes: 14 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ func (local *Backend) ImportEngine(
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
Expand Down Expand Up @@ -1549,7 +1549,7 @@ func (local *Backend) ImportEngine(

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
}

// expose these variables to unit test.
Expand Down Expand Up @@ -1689,6 +1689,16 @@ func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 {
return e.importedKVCount.Load()
}

// GetExternalEngineKVStatistics returns kv statistics of some engine.
func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) (
totalKVSize int64, totalKVCount int64) {
v, ok := local.externalEngine[engineUUID]
if !ok {
return 0, 0
}
return v.KVStatistics()
}

// ResetEngine reset the engine and reclaim the space.
func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
// the only way to reset the engine + reclaim the space is to delete and reopen it 🤷
Expand Down Expand Up @@ -1927,8 +1937,8 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (
return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

// return region split size, region split keys, error
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
// GetRegionSplitSizeKeys return region split size, region split keys, error
func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
regionSplitSize int64, regionSplitKeys int64, err error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
}
return 0, 0, errors.New("invalid connection")
}
splitSize, splitKeys, err := getRegionSplitSizeKeys(ctx, cli, nil)
splitSize, splitKeys, err := GetRegionSplitSizeKeys(ctx, cli, nil)
require.NoError(t, err)
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error
}
for tableName, engineIDs := range targetTables {
for _, engineID := range engineIDs {
_, eID := backend.MakeUUID(tableName, engineID)
_, eID := backend.MakeUUID(tableName, int64(engineID))
file := local.Engine{UUID: eID}
err := file.Exist(dir)
if err != nil {
Expand Down
48 changes: 12 additions & 36 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,14 @@ func generateMergeSortPlan(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
m := &BackfillSubTaskMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
SortedKVMeta: external.SortedKVMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
},
RangeSplitKeys: rangeSplitKeys,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -370,7 +372,9 @@ func generateMergePlan(
end = len(dataFiles)
}
m := &BackfillSubTaskMeta{
DataFiles: dataFiles[start:end],
SortedKVMeta: external.SortedKVMeta{
DataFiles: dataFiles[start:end],
},
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -441,8 +445,8 @@ func getSummaryFromLastStep(
}
// Skip empty subtask.MinKey/MaxKey because it means
// no records need to be written in this subtask.
minKey = notNilMin(minKey, subtask.MinKey)
maxKey = notNilMax(maxKey, subtask.MaxKey)
minKey = external.NotNilMin(minKey, subtask.MinKey)
maxKey = external.NotNilMax(maxKey, subtask.MaxKey)
totalKVSize += subtask.TotalKVSize

for _, stat := range subtask.MultipleFilesStats {
Expand All @@ -468,31 +472,3 @@ func redactCloudStorageURI(
}
gTask.Meta = metaBytes
}

// notNilMin returns the smaller of a and b, ignoring nil values.
func notNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// notNilMax returns the larger of a and b, ignoring nil values.
func notNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
8 changes: 2 additions & 6 deletions ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ type BackfillSubTaskMeta struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`

DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
external.SortedKVMeta `json:",inline"`
// MultipleFilesStats is the output of subtask, it will be used by the next subtask.
MultipleFilesStats []external.MultipleFilesStat `json:"multiple_files_stats"`
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if local == nil {
return errors.Errorf("local backend not found")
}
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID))
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID)
err = local.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: m.cloudStoreURI,
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ go_test(
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 10,
shard_count = 11,
deps = [
"//disttask/framework/mock",
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
Expand All @@ -52,5 +53,6 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
],
)
15 changes: 12 additions & 3 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ type TaskHandle interface {
// Dispatcher manages the lifetime of a task
// including submitting subtasks and updating the status of a task.
type Dispatcher interface {
// Init initializes the dispatcher, should be called before ExecuteTask.
// if Init returns error, dispatcher manager will fail the task directly,
// so the returned error should be a fatal error.
Init() error
// ExecuteTask start to schedule a task.
ExecuteTask()
// Close closes the dispatcher, not routine-safe, and should be called
// after ExecuteTask finished.
// Close closes the dispatcher, should be called if Init returns nil.
Close()
}

Expand Down Expand Up @@ -114,7 +118,12 @@ func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, server
}
}

// ExecuteTask start to schedule a task.
// Init implements the Dispatcher interface.
func (*BaseDispatcher) Init() error {
return nil
}

// ExecuteTask implements the Dispatcher interface.
func (d *BaseDispatcher) ExecuteTask() {
logutil.Logger(d.logCtx).Info("execute one task",
zap.String("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
Expand Down
Loading
Loading