-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: initial support for parallel DDL #6955
Conversation
util/admin/admin.go
Outdated
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
return append(generalJobs, addIdxJobs...), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return jobs may be not sorted by job ID, should we return sorted jobs? Because the older function naturally return a sorted jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be done in the next PR. I added a "TODO" now.
ddl/ddl.go
Outdated
for _, worker := range d.workers { | ||
worker.wg.Add(1) | ||
go worker.start(d.ddlCtx) | ||
// TODO: Add the type of DDL worker. | ||
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc() | ||
|
||
// For every start, we will send a fake job to let worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is For every start
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's original comment. I think it means "For each call to the start function".
ddl/ddl.go
Outdated
@@ -427,6 +423,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration { | |||
return 1 * time.Second | |||
} | |||
|
|||
func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) { | |||
// If the workes don't run, we needn't to notice workers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workers or works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify
is better than notice
.
@@ -282,41 +293,56 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { | |||
return errors.Trace(err) | |||
} | |||
|
|||
func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be multiple job DDL dependencies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only record the maximum job ID in multiple dependent jobs.
/run-all-tests |
1 similar comment
/run-all-tests |
/run-common-test |
/run-common-test |
ddl/ddl.go
Outdated
@@ -427,6 +423,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration { | |||
return 1 * time.Second | |||
} | |||
|
|||
func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) { | |||
// If the workers don't run, we needn't to notice workers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/notice/notify/
wg sync.WaitGroup | ||
id int | ||
tp workerType | ||
ddlJobCh chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ddlJobCh
used to be member of ddl, but now, we have 2 kinds of workers, we need two ddlJobCh
for every kind of worker. So move ddlJobCh
from ddl
to worker
util/admin/admin.go
Outdated
@@ -134,6 +134,22 @@ func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) { | |||
return jobs, nil | |||
} | |||
|
|||
// GetDDLJobs returns the DDL jobs and an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not need to mention the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other functions also mention the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment provides nothing more than the function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset
ddl/ddl.go
Outdated
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool) | ||
d.workers = make(map[workerType]*worker, 2) | ||
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool) | ||
d.workers[addIdxWorker] = newWorker(addIdxWorker, 0, d.store, ctxPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker id is always 0 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker id is always 0 ?
@@ -61,6 +61,7 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP | |||
worker := &worker{ | |||
id: id, | |||
tp: tp, | |||
ddlJobCh: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it need to be make(chan struct{}, 1 )
rather than make(chan struct{})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to push info to the channel.
ddl/ddl_worker.go
Outdated
for { | ||
select { | ||
case <-ticker.C: | ||
log.Debugf("[ddl] wait %s to check DDL status again", checkTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%s to print time? what's the result looks like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not print worker type
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the old code, I will handle it.
ddl/ddl_worker.go
Outdated
@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) { | |||
// buildJobDependence sets the curjob's dependency-ID. | |||
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. | |||
func buildJobDependence(t *meta.Meta, curJob *model.Job) error { | |||
switch curJob.Type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to understand here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also hard to understand for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think add comment helps @shenli
meta.Meta
should not store the information about queue key.
If the caller has to modify status in meta.Meta
, before calling its method, why not provide that status as argument?
ddl/ddl_worker.go
Outdated
t.SetJobListKey(meta.AddIndexJobListKey) | ||
defer t.SetJobListKey(meta.DefaultJobListKey) | ||
} | ||
|
||
jobs, err := t.GetAllDDLJobs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest rename GetAllDDLJobs
to GetDDLJobsInQueue
and pass the queue, rather than change meta's jobListKey
status, it's very tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetAllDDLJobs definitely get jobs from mDDLJobListKey
? Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mDDLJobListKey
will change between two queue, I don't know which one, that's the problem.
So I suggest:
GetDDLJobsInQueue(general)
GetDDLJobsInQueue(addindex)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I changed the name of the function.
ddl/ddl_worker.go
Outdated
return true, nil | ||
} | ||
|
||
historyJob, err := t.GetHistoryDDLJob(job.DependencyID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will t.GetHistoryDDLJob
select the right job queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to find the job in the waiting job list. Because the history job list maybe long.
@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) { | |||
// buildJobDependence sets the curjob's dependency-ID. | |||
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. | |||
func buildJobDependence(t *meta.Meta, curJob *model.Job) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be read/write conflict in GetALLDDLJobs, because each worker check the other worker's queue to dependence.
Is the error retryable and properly handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be retried.
ddl/ddl_worker_test.go
Outdated
once := sync.Once{} | ||
var checkErr error | ||
tc.onJobRunBefore = func(job *model.Job) { | ||
// TODO: extract a unified function for use by other tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/for use by other tests/for other tests
ddl/ddl_worker_test.go
Outdated
if lastJob != nil { | ||
finishedJobs, err := m.GetAllHistoryDDLJobs() | ||
c.Assert(err, IsNil) | ||
// get the last 11 jobs completed。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a strange char here
util/admin/admin.go
Outdated
@@ -104,6 +104,9 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { | |||
errs[i] = errors.Trace(err) | |||
continue | |||
} | |||
if job.Type == model.ActionAddIndex { | |||
t.SetJobListKey(meta.AddIndexJobListKey) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an else branch so that the code is more robust without the assumption about job.Type
default value.
ddl/ddl.go
Outdated
@@ -440,7 +449,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { | |||
} | |||
|
|||
// Notice worker that we push a new job and wait the job done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Notice/Notify/g
ddl/ddl_test.go
Outdated
err := d.doDDLJob(ctx, job) | ||
c.Assert(err, IsNil) | ||
v := getSchemaVer(c, ctx) | ||
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) | ||
return job | ||
} | ||
|
||
func buildRebaseAutoID(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildRebaseAutoIDJob
ddl/ddl_worker.go
Outdated
@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) { | |||
// buildJobDependence sets the curjob's dependency-ID. | |||
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list. | |||
func buildJobDependence(t *meta.Meta, curJob *model.Job) error { | |||
switch curJob.Type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments.
ddl/ddl_worker.go
Outdated
@@ -162,6 +168,7 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error { | |||
return errors.Trace(err) | |||
} | |||
if isDependent { | |||
log.Infof("[ddl] current DDL job %v is dependent job %v", curJob, job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "current DDL job %v depends on job %v"?
ddl/ddl_worker.go
Outdated
@@ -348,7 +374,8 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error { | |||
return errors.Trace(w.handleUpdateJobError(t, job, err)) | |||
}) | |||
|
|||
if runJobErr != nil { | |||
waitDependencyJob := job != nil && job.DependencyID != 0 | |||
if runJobErr != nil || waitDependencyJob { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If waitDependencyJob is true, it is not an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but we'd better wait a moment. I will add a comment for it.
/run-all-tests |
@@ -306,7 +305,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, | |||
uuid: id, | |||
store: store, | |||
lease: lease, | |||
ddlJobCh: make(chan struct{}, 1), | |||
ddlJobDoneCh: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move the ddlJobDoneCh to the different worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
ddl/ddl_worker.go
Outdated
if historyJob == nil { | ||
return false, nil | ||
} | ||
log.Infof("[ddl] DDL job %v isn't dependent on job ID %d", job, job.DependencyID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DDL job %v isn't dependent on job ID %d
? What about DDL job %v dependent job ID %d is finished
?
@@ -104,7 +104,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { | |||
errs[i] = errors.Trace(err) | |||
continue | |||
} | |||
err = t.UpdateDDLJob(int64(j), job, true) | |||
if job.Type == model.ActionAddIndex { | |||
err = t.UpdateDDLJob(int64(j), job, true, meta.AddIndexJobListKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just new a meta with meta.AddIndexJobListKey
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a meta here, I think it's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we can use a new meta to avoid add a param to the function.
ddl/ddl.go
Outdated
@@ -413,11 +411,9 @@ func (d *ddl) genGlobalID() (int64, error) { | |||
|
|||
// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update the comment.
@zimulala Please resolve the conflicts. |
ddl/ddl_worker.go
Outdated
@@ -282,41 +299,61 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { | |||
return errors.Trace(err) | |||
} | |||
|
|||
func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { | |||
if job.DependencyID == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a constant for 0 or add comment for the if statement.
ddl/ddl_worker.go
Outdated
return true, nil | ||
} | ||
|
||
func newMetaWithQueueTp(txn kv.Transaction, tp string) *meta.Meta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address it.
meta/meta.go
Outdated
@@ -86,11 +86,20 @@ type Meta struct { | |||
} | |||
|
|||
// NewMeta creates a Meta in transaction txn. | |||
func NewMeta(txn kv.Transaction) *Meta { | |||
// If the current Meta needs to handle a job, jobListKey is the type of the job's list. | |||
// We don't change the value of the jobListKey in a Meta. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not understand the comment.
ddl/db_change_test.go
Outdated
for { | ||
kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { | ||
m := meta.NewMeta(txn) | ||
// Get the number of jobs from the adding index queue. | ||
addIdxLen, err1 := m.DDLJobQueueLen(meta.AddIndexJobListKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use GetDDLJobs and get the length of the return value?
@@ -175,14 +190,18 @@ func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error { | |||
job.Version = currentVersion | |||
job.Query, _ = ctx.Value(sessionctx.QueryString).(string) | |||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { | |||
t := meta.NewMeta(txn) | |||
t := newMetaWithQueueTp(txn, job.Type.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use meta.AddIndexJobListKey as the second parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The job may not be add index, So has to according to the job type to create meta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost. I want to put the check of job type or worker type into newMetaWithQueueTp
.
@@ -362,6 +401,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error { | |||
// No job now, return and retry getting later. | |||
return nil | |||
} | |||
w.waitDependencyJobFinished(job, &waitDependencyJobCnt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its dependencyJob is not done yet, it would return at line 357. So why we need to wait here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 357 return is in a txn
func, not return in handleDDLJobQueue func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @crazycs520 said. And if put it in line357, we need wait for 200ms. I am afraid this txn
is easy to conflict. So I put it here.
@@ -86,11 +86,19 @@ type Meta struct { | |||
} | |||
|
|||
// NewMeta creates a Meta in transaction txn. | |||
func NewMeta(txn kv.Transaction) *Meta { | |||
// If the current Meta needs to handle a job, jobListKey is the type of the job's list. | |||
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we always specify the JobListKey?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of places use this function, so I use this method to handle it.
And In other packages, I think we needn't distinguish the type of jobListKeys
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
/run-common-test |
/run-common-test -tidb-test=pr/592 |
What have you changed? (mandatory)
ddl: remove
cleanAddIndexQueueJobs
and initial support for parallel DDL.Initial support for parallel DDL is as follows:
The DDL of "add index" and the other types of DDL can be executed parallelly when they are on the different tables. We use two queues to save the "add index" and other DDLs in storage. And we have two workers to handle these DDL jobs. The "add index" worker handles the "add index" queue. Another worker handles another queue.
If the DDL of "add index" and the other types of DDL are on the same table, we need to perform these two operations serially.
What are the type of the changes (mandatory)?
The currently defined types are listed below, please pick one of the types for this PR by removing the others:
How has this PR been tested (mandatory)?
unit test
Does this PR affect documentation (docs/docs-cn) update? (optional)
Yes.