-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: refactor copIterator #2804
Conversation
make the code simpler, so it's easy to reason about: * change tasks' struct from array to list, which benefit for rebuildCurrentTask * remove the complex lock related code * fix memory leak
store/tikv/coprocessor.go
Outdated
} | ||
} | ||
} | ||
|
||
func (it *copIterator) run(ctx goctx.Context) { | ||
for t := it.tasks; t != nil; t = t.next { | ||
it.addTask(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task channel may be full, blocked forever.
store/tikv/coprocessor.go
Outdated
it.mu.tasks[i].idx = i | ||
} | ||
// This is simple, just replace the old task node with the new task list. | ||
replaceNodeWithList(task, newTasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new tasks could be processed by other workers before be inserted to list.
store/tikv/coprocessor.go
Outdated
// The worker function that get a copTask from channel, handle it and | ||
// send the result back. | ||
func (it *copIterator) work(ctx goctx.Context, taskCh <-chan *copTask) { | ||
for task := range taskCh { | ||
bo := NewBackoffer(copNextMaxBackoff, ctx) | ||
startTime := time.Now() | ||
resp, err := it.handleTask(bo, task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about handleTask
returns multiple responses, then we can make sure the new tasks is handled immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! I'll have a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multiple responses can be sent to the original task's channel.
Instead of just read one response, the reader reads multiple responses from a task's channel until the channel is closed.
Then we don't need a task list because the original tasks never change.
d48f1c1
to
5244954
Compare
store/tikv/coprocessor.go
Outdated
it.mu.Lock() | ||
task.status = taskDone | ||
it.mu.Unlock() | ||
it.curr++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only read 1 response from each task?
Looks much better. |
LGTM |
9e36404
to
7c104f1
Compare
PTAL @disksing |
store/tikv/coprocessor.go
Outdated
finished chan struct{} | ||
taskCh chan *copTask | ||
|
||
// If keepOrder, result is stored in copTask.respChan, read them out one by one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/result is/results are/
99cc756
to
7c7fdb2
Compare
store/tikv/coprocessor.go
Outdated
// Rebuild current task. It may be split into multiple tasks (in region split scenario). | ||
func (it *copIterator) rebuildCurrentTask(bo *Backoffer, task *copTask) error { | ||
// Rebuild and handle current task. It may be split into multiple tasks (in region split scenario). | ||
func (it *copIterator) rebuildCurrentTask(bo *Backoffer, task *copTask) []copResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pick a better name for this function. How about handleRegionErrorTask?
LGTM |
make the code simpler, so it's easy to reason about:
PTAL @coocood @disksing @shenli @zimulala