-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importinto: integrate global sort(without merge-sort) part 1 #46998
Conversation
Hi @D3Hunter. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/ok-to-test |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #46998 +/- ##
================================================
- Coverage 73.0821% 72.6249% -0.4573%
================================================
Files 1336 1358 +22
Lines 398227 405296 +7069
================================================
+ Hits 291033 294346 +3313
- Misses 88451 92244 +3793
+ Partials 18743 18706 -37
Flags with carried forward coverage won't be shown. Click here to find out more.
|
7bc104f
to
65b43b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Framework PART LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
15 / 52 files viewed
will review later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
21 / 52 files viewed
logger := logutil.BgLogger().With( | ||
zap.String("type", gTask.Type), | ||
zap.Int64("task-id", gTask.ID), | ||
zap.String("step", stepStr(gTask.Step)), | ||
zap.String("curr-step", stepStr(gTask.Step)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In master code, I have added nextStep
as a parameter of OnNextSubtasksBatch
. 😂 My consideration is, GetNextStep
can be called only once while OnNextSubtasksBatch
maybe called for multiple time for a subtask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see that GetNextStep
now has a handle
param, we can remove it, and move those logic into OnNextSubtasksBatch
, if there's no need to do merge-sort, just return 0 subtask metas, and let dispatcher call it again on next step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If GetNextStep
is heavy to compute, it's better we let framework call it once and send the result to the argument of OnNextSubtasksBatch
if there's no need to do merge-sort, just return 0 subtask metas
Currently I think it's also acceptable. In future we may have a complex step transition and must use non-linear steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
33 / 52 files viewed
// DecodeIndexID decodes indexID from the key. | ||
// this method simply extract index id part, and no other checking. | ||
// Caller should make sure the key is an index key. | ||
func DecodeIndexID(key kv.Key) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems in below DecodeTableID
we should also consider keyspace of APIv2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only that method checks it, seems we don't need to check it here for index kv
// `V2` use new encoding for RawKV & TxnKV to support more features.
//
// Key Encoding:
// TiDB: start with `m` or `t`, the same as `V1`.
// TxnKV: prefix with `x`, encoded as `MCE( x{keyspace id} + {user key} ) + timestamp`.
// RawKV: prefix with `r`, encoded as `MCE( r{keyspace id} + {user key} ) + timestamp`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, keyspace may be automatically adjusted in client-go, we don't need to care about it here.
@@ -147,6 +147,29 @@ func GetCachedKVStoreFrom(pdAddr string, tls *common.TLS) (tidbkv.Storage, error | |||
return kvStore, nil | |||
} | |||
|
|||
// GetRegionSplitSizeKeys gets the region split size and keys from PD. | |||
func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error) { | |||
tidbCfg := tidb.GetGlobalConfig() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in future we can get the singleton PD client of this TiDB instance, and use the latest PD address, in case of PD node is changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
35 / 52 files viewed
switch gTask.Step { | ||
case proto.StepInit: | ||
if metrics, ok := metric.GetCommonMetric(ctx); ok { | ||
metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(taskMeta.Plan.TotalFileSize)) | ||
} | ||
if err := preProcess(ctx, taskHandle, gTask, taskMeta, logger); err != nil { | ||
jobStep := importer.JobStepImporting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and these logic seems more natural as a part of GetNextStep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, that's import-into
job's step, exposed to user and has less steps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
37 / 52 files viewed
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID} | ||
indexWriterFn := func(indexID int64) *external.Writer { | ||
builder := external.NewWriterBuilder(). | ||
SetOnCloseFunc(func(summary *external.WriterSummary) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to let the "WriterSummary" result be returned when Writer.Close, not passing it around using OnClose. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
op.sharedVars.mergeIndexSummary(indexID, summary) | ||
}) | ||
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID))) | ||
writerID := path.Join("index", strconv.Itoa(int(indexID)), strconv.Itoa(int(workerID))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering two nodes are running same subtask (maybe the old one suffers network partition and framework dispatch a new node), we should use random writerID to avoid overwritten. Or maybe WriterBuilder should assign UUID internally 🤔
The result file path will be returned by OnClose, so caller seems don't need to care about the writerID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering two nodes are running same subtask
makes sense, will change it to a uuid here
so caller seems don't need to care about the writerID
that would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
41 / 52 files viewed
@D3Hunter: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
executor/importer/import.go
Outdated
cloudStorageURI, err1 := seCtx.GetSessionVars().GlobalVarsAccessor. | ||
GetGlobalSysVar(variable.TiDBCloudStorageURI) | ||
if err1 != nil { | ||
return errors.Trace(err1) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use variable.CloudStorageURI.Load()
instead?
|
||
// NotNilMin returns the smallest of a and b, ignoring nil values. | ||
func NotNilMin(a, b []byte) []byte { | ||
if len(a) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if both a and b nil?it will return b(nil)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's moved from add-index, didn't change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Benjamin2037 Yes, returning a nil
value is expected.
} | ||
|
||
// NotNilMin returns the smallest of a and b, ignoring nil values. | ||
func NotNilMin(a, b []byte) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from ddl/backfilling_dispatcher.go
return err | ||
} | ||
} else { | ||
if err := importer.ProcessChunkWith(ctx, &chunkCheckpoint, sharedVars.TableImporter, dataWriter, indexWriter, sharedVars.Progress, logger); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why global sort use ProcessChunkWith, but local use ProcessChunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the name of function is not consistency.
// If we import the index in `cleanupSubtaskEnv`, the dispatcher will not wait for the import to complete. | ||
// Multiple index engines may suffer performance degradation due to range overlap. | ||
// These issues will be alleviated after we integrate s3 sorter. | ||
// engineID = -1, -2, -3, ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this comment used for “// engineID = -1, -2, -3, ...”?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each subtask will have a index engine, that's how we alloc it's id
m.Merge(NewSortedKVMeta(summary)) | ||
} | ||
|
||
// NotNilMin returns the smallest of a and b, ignoring nil values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// NotNilMin returns the smallest of a and b, ignoring nil values. | |
// NotNilMin returns the smallest of a and b. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: lance6716, tangenta The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest |
What problem does this PR solve?
Issue Number: ref #46704
Problem Summary:
when using global sort, we write index kv using IndexRouteWriter
we route kvs of different index to different writer in order to make
merge sort easier, else kv data of all subtasks will all be overlapped.
drawback of doing this is that the number of writers need to open will be
index-count * encode-concurrency, when the table has many indexes, and each
writer will take 256MiB buffer on default,
this will take a lot of memory, or even OOM.
we can adjust memory buf size later, but might write too many small files.
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.