Skip to content

Commit

Permalink
add Detach() method to detach the record set
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Jun 19, 2024
1 parent f124165 commit 6583c38
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/sortexec",
"//pkg/executor/staticrecordset",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
31 changes: 24 additions & 7 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/staticrecordset"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/keyspace"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (a *recordSet) Finish() error {
defer a.finishLock.Unlock()
a.once.Do(func() {
err = exec.Close(a.executor)
cteErr := resetCTEStorageMap(a.stmt.Ctx)
cteErr := resetCTEStorageMap(a.stmt.Ctx.GetSessionVars().StmtCtx.CTEStorageMap)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
Expand Down Expand Up @@ -241,6 +242,21 @@ func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, len(a.lastErrs) == 0, true)
}

// Detach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) Detach() (sqlexec.RecordSet, error) {
cteStorage := a.stmt.Ctx.GetSessionVars().StmtCtx.CTEStorageMap
closeFn := func() error {
return resetCTEStorageMap(cteStorage)
}
// TODO: also detach the executor. Currently, the executor inside may contain the session context. Once
// the executor itself supports detach, we should also detach it here.
e, ok := a.executor.(*TableReaderExecutor)
if !ok {
return nil, errors.New("executor is not supported")
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false), closeFn), nil
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down Expand Up @@ -793,7 +809,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e exec.Executor, isPessimi
// `rs.Close` in `handleStmt`
if handled && sc != nil && rs == nil {
sc.DetachMemDiskTracker()
cteErr := resetCTEStorageMap(a.Ctx)
cteErr := resetCTEStorageMap(a.Ctx.GetSessionVars().StmtCtx.CTEStorageMap)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
Expand Down Expand Up @@ -1504,16 +1520,16 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
func resetCTEStorageMap(anyStorageMap any) error {
if anyStorageMap == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
storageMap, ok := anyStorageMap.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}

for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
Expand All @@ -1528,7 +1544,8 @@ func resetCTEStorageMap(se sessionctx.Context) error {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
// avoid calling `DerefAndClose` multiple times on the same `CTEStorage` in `TraceExec`
clear(storageMap)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestCTEDelSpillFile(t *testing.T) {
tk.MustExec("set @@tidb_mem_quota_query = 100;")
tk.MustExec("insert into t2 values(1);")
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
require.Empty(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
}

func TestCTEShareCorColumn(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions pkg/executor/staticrecordset/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "staticrecordset",
srcs = [
"recordset.go",
"txn.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/staticrecordset",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/internal/exec",
"//pkg/kv",
"//pkg/parser/ast",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "staticrecordset_test",
timeout = "short",
srcs = ["integration_test.go"],
flaky = True,
shard_count = 3,
deps = [
"//pkg/testkit",
"//pkg/util/sqlexec",
"@com_github_stretchr_testify//require",
],
)
102 changes: 102 additions & 0 deletions pkg/executor/staticrecordset/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset_test

import (
"context"
"testing"

"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
)

func TestStaticRecordSet(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs := rs.(sqlexec.DetachableRecordSet)
srs, err := drs.Detach()
require.NoError(t, err)

// check schema
require.Len(t, srs.Fields(), 1)
require.Equal(t, "id", srs.Fields()[0].Column.Name.O)

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestStaticRecordSetWithTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs := rs.(sqlexec.DetachableRecordSet)
srs, err := drs.Detach()
require.NoError(t, err)

// Now, it's fine to run another statement on the session
// remove all existing data in the table
tk.MustExec("truncate table t")

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestDetachError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

// explicit transaction is not allowed
tk.MustExec("begin")
rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs2 := rs.(sqlexec.DetachableRecordSet)
_, err = drs2.Detach()
require.Error(t, err)
tk.MustExec("commit")
}
86 changes: 86 additions & 0 deletions pkg/executor/staticrecordset/recordset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset

import (
"context"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

var _ sqlexec.RecordSet = &staticRecordSet{}

type staticRecordSet struct {
fields []*ast.ResultField
executor exec.Executor

sqlText string

closeFn func() error
}

// New creates a new staticRecordSet
func New(fields []*ast.ResultField, executor exec.Executor, sqlText string, closeFn func() error) sqlexec.RecordSet {
return &staticRecordSet{
fields: fields,
executor: executor,
sqlText: sqlText,
closeFn: closeFn,
}
}

func (s *staticRecordSet) Fields() []*ast.ResultField {
return s.fields
}

func (s *staticRecordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
defer func() {
r := recover()
if r == nil {
return
}
err = util.GetRecoverError(r)
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", s.sqlText), zap.Stack("stack"))
}()

return exec.Next(ctx, s.executor, req)
}

// NewChunk create a chunk base on top-level executor's exec.NewFirstChunk().
func (s *staticRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
if alloc == nil {
return exec.NewFirstChunk(s.executor)
}

return alloc.Alloc(s.executor.RetFieldTypes(), s.executor.InitCap(), s.executor.MaxChunkSize())
}

// Close closes the executor.
func (s *staticRecordSet) Close() error {
err := exec.Close(s.executor)
err2 := s.closeFn()
if err == nil {
err = err2
}
s.executor = nil

return err
}
67 changes: 67 additions & 0 deletions pkg/executor/staticrecordset/txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset

import (
"context"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
)

var _ sqlexec.RecordSet = &recordSetWithTxn{}

type recordSetWithTxn struct {
rs sqlexec.RecordSet
txn kv.Transaction
}

// WrapRecordSetWithStartTS wraps the recordSet to automatically start a new transaction with the given startTS.
func WrapRecordSetWithStartTS(rs sqlexec.RecordSet, startTS uint64, store kv.Storage) (sqlexec.RecordSet, error) {
txn, err := store.Begin(tikv.WithStartTS(startTS))
if err != nil {
return nil, err
}
return &recordSetWithTxn{rs: rs, txn: txn}, nil
}

// Fields gets result fields.
func (r *recordSetWithTxn) Fields() []*ast.ResultField {
return r.rs.Fields()
}

// Next reads records into chunk.
func (r *recordSetWithTxn) Next(ctx context.Context, req *chunk.Chunk) error {
return r.rs.Next(ctx, req)
}

// NewChunk create a chunk, if allocator is nil, the default one is used.
func (r *recordSetWithTxn) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return r.rs.NewChunk(alloc)
}

// Close closes the underlying iterator, call Next after Close will
// restart the iteration.
func (r *recordSetWithTxn) Close() error {
err1 := r.rs.Close()
err2 := r.txn.Commit(context.Background())
if err1 != nil {
return err1
}
return err2
}
1 change: 1 addition & 0 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/domain/infosync",
"//pkg/errno",
"//pkg/executor",
"//pkg/executor/staticrecordset",
"//pkg/expression",
"//pkg/expression/context",
"//pkg/expression/contextsession",
Expand Down
Loading

0 comments on commit 6583c38

Please sign in to comment.