Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: coprocess streaming tiny fix #6186

Merged
merged 14 commits into from
Apr 4, 2018
7 changes: 6 additions & 1 deletion store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tikv

import (
"io"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/terror"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -254,7 +256,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
var first *coprocessor.Response
first, err = copStream.Recv()
if err != nil {
return nil, errors.Trace(err)
if errors.Cause(err) != io.EOF {
return nil, errors.Trace(err)
}
log.Debug("copstream returns nothing for the request.")
}
copStream.Response = first
return resp, nil
Expand Down
16 changes: 11 additions & 5 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,19 @@ func (it *copIterator) handleTaskOnce(bo *Backoffer, task *copTask, ch chan copR
}

// Handles the response for non-streaming copTask.
return it.handleCopResponse(bo, resp.Cop, task, ch)
return it.handleCopResponse(bo, resp.Cop, task, ch, resp.Cop)
}

func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan copResponse) ([]*copTask, error) {
defer stream.Close()
var resp, lastResp *coprocessor.Response
resp = stream.Response
if resp == nil {
// streaming request returns io.EOF, so the first Response is nil.
return nil, nil
}
for {
remainedTasks, err := it.handleCopResponse(bo, resp, task, ch)
remainedTasks, err := it.handleCopResponse(bo, resp, task, ch, lastResp)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand Down Expand Up @@ -652,7 +656,9 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse) ([]*copTask, error) {
// if we're handling streaming coprocessor response, lastResp is the last successful response,
// if we're handling normal request, lastResp and resp is the same.
func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastResp *coprocessor.Response) ([]*copTask, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the comment for lastResp?

Copy link
Member

@coocood coocood Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using lastRange *KeyRange instead of lastResp *coprocessor.Response?
It's easier to understand the purpose of the argument.

if regionErr := resp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
Expand All @@ -672,7 +678,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon
return nil, errors.Trace(err)
}
}
return buildCopTasksFromRemain(bo, it.store.regionCache, resp, task, it.req.Desc, it.req.Streaming)
return buildCopTasksFromRemain(bo, it.store.regionCache, lastResp, task, it.req.Desc, it.req.Streaming)
}
if otherErr := resp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand All @@ -692,7 +698,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon

func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, resp *coprocessor.Response, task *copTask, desc bool, streaming bool) ([]*copTask, error) {
remainedRanges := task.ranges
if streaming {
if streaming && resp != nil {
remainedRanges = calculateRemain(task.ranges, resp.Range, desc)
}
return buildCopTasks(bo, cache, remainedRanges, desc, streaming)
Expand Down