From 5fe2a580f143d2c41dacb447a95bdd9176ee7d8e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 24 Mar 2020 11:24:53 +0800 Subject: [PATCH 1/2] cherry pick #15592 to release-3.1 Signed-off-by: sre-bot --- distsql/select_result.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/distsql/select_result.go b/distsql/select_result.go index 843e8b048ba61..c2a0d6e05be72 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -16,10 +16,12 @@ package distsql import ( "context" "fmt" + "sync/atomic" "time" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" @@ -33,6 +35,10 @@ import ( "go.uber.org/zap" ) +var ( + errQueryInterrupted = terror.ClassExecutor.NewStd(errno.ErrQueryInterrupted) +) + var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*streamResult)(nil) @@ -105,6 +111,7 @@ func (r *selectResult) fetch(ctx context.Context) { var result resultWithErr resultSubset, err := r.resp.Next(ctx) if err != nil { +<<<<<<< HEAD result.err = err } else if resultSubset == nil { // If the result is drained, the resultSubset would be nil @@ -112,6 +119,29 @@ func (r *selectResult) fetch(ctx context.Context) { } else { result.result = resultSubset r.memConsume(int64(resultSubset.MemSize())) +======= + return errors.Trace(err) + } + r.selectRespSize = r.selectResp.Size() + r.memConsume(int64(r.selectRespSize)) + if err := r.selectResp.Error; err != nil { + return terror.ClassTiKV.Synthesize(terror.ErrCode(err.Code), err.Msg) + } + sessVars := r.ctx.GetSessionVars() + if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) { + return errors.Trace(errQueryInterrupted) + } + sc := sessVars.StmtCtx + for _, warning := range r.selectResp.Warnings { + sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) + } + r.updateCopRuntimeStats(resultSubset.GetExecDetails(), resultSubset.RespTime()) + r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) + r.partialCount++ + sc.MergeExecDetails(resultSubset.GetExecDetails(), nil) + if len(r.selectResp.Chunks) != 0 { + break +>>>>>>> 00dc69c... distsql: check the killed flag in the distsql package (#15592) } select { From ed2b87d7f5cc8ef195eb2a6832ade09da1b0f34a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 30 Mar 2020 13:22:41 +0800 Subject: [PATCH 2/2] resolve conflict --- distsql/select_result.go | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index c2a0d6e05be72..d03ee2838f6cb 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -20,8 +20,8 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" @@ -36,7 +36,7 @@ import ( ) var ( - errQueryInterrupted = terror.ClassExecutor.NewStd(errno.ErrQueryInterrupted) + errQueryInterrupted = terror.ClassExecutor.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) ) var ( @@ -111,7 +111,6 @@ func (r *selectResult) fetch(ctx context.Context) { var result resultWithErr resultSubset, err := r.resp.Next(ctx) if err != nil { -<<<<<<< HEAD result.err = err } else if resultSubset == nil { // If the result is drained, the resultSubset would be nil @@ -119,29 +118,6 @@ func (r *selectResult) fetch(ctx context.Context) { } else { result.result = resultSubset r.memConsume(int64(resultSubset.MemSize())) -======= - return errors.Trace(err) - } - r.selectRespSize = r.selectResp.Size() - r.memConsume(int64(r.selectRespSize)) - if err := r.selectResp.Error; err != nil { - return terror.ClassTiKV.Synthesize(terror.ErrCode(err.Code), err.Msg) - } - sessVars := r.ctx.GetSessionVars() - if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) { - return errors.Trace(errQueryInterrupted) - } - sc := sessVars.StmtCtx - for _, warning := range r.selectResp.Warnings { - sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) - } - r.updateCopRuntimeStats(resultSubset.GetExecDetails(), resultSubset.RespTime()) - r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) - r.partialCount++ - sc.MergeExecDetails(resultSubset.GetExecDetails(), nil) - if len(r.selectResp.Chunks) != 0 { - break ->>>>>>> 00dc69c... distsql: check the killed flag in the distsql package (#15592) } select { @@ -218,7 +194,11 @@ func (r *selectResult) getSelectResp() error { if err := r.selectResp.Error; err != nil { return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg) } - sc := r.ctx.GetSessionVars().StmtCtx + sessVars := r.ctx.GetSessionVars() + if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) { + return errors.Trace(errQueryInterrupted) + } + sc := sessVars.StmtCtx for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) }