Skip to content

Commit

Permalink
server,executor: split ResultSet Close() to Finish() and Close() (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Dec 8, 2023
1 parent 0133ced commit d23e1c3
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 31 deletions.
42 changes: 27 additions & 15 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -90,6 +91,7 @@ type recordSet struct {
stmt *ExecStmt
lastErr error
txnStartTS uint64
once sync.Once
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -179,13 +181,31 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
}

func (a *recordSet) Finish() error {
var err error
a.once.Do(func() {
err = a.executor.Close()
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if err == nil {
err = cteErr
}
})
if err != nil {
a.lastErr = err
}
return err
}

func (a *recordSet) Close() error {
err := a.executor.Close()
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
err := a.Finish()
if err != nil {
return err
logutil.BgLogger().Error("close recordSet error", zap.Error(err))
}
return err1
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -871,7 +891,8 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (_ sqlexec.RecordSet, retErr error) {
Expand Down Expand Up @@ -1448,19 +1469,10 @@ func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if lastErr == nil {
// Only overwrite err when it's nil.
lastErr = cteErr
}
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker()
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
Expand Down
8 changes: 3 additions & 5 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,7 @@ func (cc *clientConn) initConnect(ctx context.Context) error {
break
}
}
if err := rs.Close(); err != nil {
return err
}
rs.Close()
}
}
logutil.Logger(ctx).Debug("init_connect complete")
Expand Down Expand Up @@ -2050,7 +2048,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
// - If the rs is nil and err is not nil, the detachment will be done in
// the `handleNoDelay`.
if rs != nil {
defer terror.Call(rs.Close)
defer rs.Close()
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
Expand Down Expand Up @@ -2318,7 +2316,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs resultset.ResultSet, b
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
}
if err := rs.Close(); err != nil {
if err := rs.Finish(); err != nil {
return false, err
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/server/internal/dump"
"github.com/pingcap/tidb/pkg/server/internal/parse"
Expand Down Expand Up @@ -304,7 +303,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
if rs != nil {
defer terror.Call(rs.Close)
defer rs.Close()
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
Expand Down
1 change: 1 addition & 0 deletions pkg/server/internal/resultset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/server/internal/resultset",
visibility = ["//pkg/server:__subpackages__"],
deps = [
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/server/internal/column",
"//pkg/types",
Expand Down
18 changes: 13 additions & 5 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/server/internal/column"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -30,11 +31,12 @@ type ResultSet interface {
Columns() []*column.Info
NewChunk(chunk.Allocator) *chunk.Chunk
Next(context.Context, *chunk.Chunk) error
Close() error
Close()
// IsClosed checks whether the result set is closed.
IsClosed() bool
FieldTypes() []*types.FieldType
SetPreparedStmt(stmt *core.PlanCacheStmt)
Finish() error
}

var _ ResultSet = &tidbResultSet{}
Expand Down Expand Up @@ -62,13 +64,19 @@ func (trs *tidbResultSet) Next(ctx context.Context, req *chunk.Chunk) error {
return trs.recordSet.Next(ctx, req)
}

func (trs *tidbResultSet) Close() error {
func (trs *tidbResultSet) Finish() error {
if x, ok := trs.recordSet.(interface{ Finish() error }); ok {
return x.Finish()
}
return nil
}

func (trs *tidbResultSet) Close() {
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return nil
return
}
err := trs.recordSet.Close()
terror.Call(trs.recordSet.Close)
trs.recordSet = nil
return err
}

// IsClosed implements ResultSet.IsClosed interface.
Expand Down
21 changes: 17 additions & 4 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2441,18 +2441,31 @@ type execStmtResult struct {
sqlexec.RecordSet
se *session
sql sqlexec.Statement
once sync.Once
closed bool
}

func (rs *execStmtResult) Finish() error {
var err error
rs.once.Do(func() {
var err1 error
if f, ok := rs.RecordSet.(interface{ Finish() error }); ok {
err1 = f.Finish()
}
err2 := finishStmt(context.Background(), rs.se, err, rs.sql)
err = stderrs.Join(err1, err2)
})
return err
}

func (rs *execStmtResult) Close() error {
if rs.closed {
return nil
}
se := rs.se
err := rs.RecordSet.Close()
err = finishStmt(context.Background(), se, err, rs.sql)
err1 := rs.Finish()
err2 := rs.RecordSet.Close()
rs.closed = true
return err
return stderrs.Join(err1, err2)
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
Expand Down

0 comments on commit d23e1c3

Please sign in to comment.