Skip to content

Commit

Permalink
Add a function to detach the TableReaderExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Jul 15, 2024
1 parent df78562 commit 641b42e
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 128 deletions.
5 changes: 4 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/ranger"
Expand Down Expand Up @@ -86,9 +87,11 @@ func newReorgExprCtx() exprctx.ExprContext {
contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels),
)

planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn)

return contextstatic.NewStaticExprContext(
contextstatic.WithEvalCtx(evalCtx),
contextstatic.WithUseCache(false),
contextstatic.WithPlanCacheTracker(&planCacheTracker),
)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/distsql/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/util/context",
"//pkg/util/execdetails",
"//pkg/util/memory",
"//pkg/util/nocopy",
"//pkg/util/sqlkiller",
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
Expand Down
29 changes: 23 additions & 6 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/nocopy"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
Expand All @@ -34,11 +33,6 @@ import (

// DistSQLContext provides all information needed by using functions in `distsql`
type DistSQLContext struct {
// TODO: provide a `Clone` to copy this struct.
// The life cycle of some fields in this struct cannot be extended. For example, some fields will be recycled before
// the next execution. They'll need to be handled specially.
_ nocopy.NoCopy

WarnHandler contextutil.WarnAppender

InRestrictedSQL bool
Expand Down Expand Up @@ -95,3 +89,26 @@ type DistSQLContext struct {
func (dctx *DistSQLContext) AppendWarning(warn error) {
dctx.WarnHandler.AppendWarning(warn)
}

// Detach detaches this context from the session context.
//
// NOTE: Though this session context can be used parallelly with this context after calling
// it, the `StatementContext` cannot. The session context should create a new `StatementContext`
// before executing another statement.
func (dctx *DistSQLContext) Detach() *DistSQLContext {
newCtx := *dctx

// TODO: this SQLKiller is not connected with the original one, so the user will have no way to kill
// the SQL running with the detached context. The current implementation of `SQLKiller` doesn't support
// tracking a reference which may run across multiple statements, because before executing any statement
//, the `SQLKiller` will always be reset.
//
// A simple way to fix it is to use the origianl SQLKiller, and wait for all cursor to be closed after
// receiving a kill signal and before resetting it. For now, it uses a newly created `SQLKiller` to avoid
// affecting the original one and keep safety.
newCtx.SQLKiller = &sqlkiller.SQLKiller{}
newCtx.KVVars = new(tikvstore.Variables)
*newCtx.KVVars = *dctx.KVVars
newCtx.KVVars.Killed = &newCtx.SQLKiller.Signal
return &newCtx
}
6 changes: 6 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"cte_table_reader.go",
"ddl.go",
"delete.go",
"detach.go",
"distsql.go",
"executor.go",
"explain.go",
Expand Down Expand Up @@ -126,6 +127,7 @@ go_library(
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/context",
"//pkg/expression/contextsession",
"//pkg/extension",
"//pkg/infoschema",
"//pkg/infoschema/context",
Expand Down Expand Up @@ -309,6 +311,8 @@ go_test(
"compact_table_test.go",
"copr_cache_test.go",
"delete_test.go",
"detach_integration_test.go",
"detach_test.go",
"distsql_test.go",
"executor_failpoint_test.go",
"executor_pkg_test.go",
Expand Down Expand Up @@ -389,6 +393,7 @@ go_test(
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/contextstatic",
"//pkg/extension",
"//pkg/infoschema",
"//pkg/kv",
Expand Down Expand Up @@ -451,6 +456,7 @@ go_test(
"//pkg/util/ranger",
"//pkg/util/sem",
"//pkg/util/set",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/stmtsummary/v2:stmtsummary",
"//pkg/util/stringutil",
Expand Down
9 changes: 6 additions & 3 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,18 @@ func (a *recordSet) OnFetchReturned() {

// Detach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) {
// TODO: also detach the executor. Currently, the executor inside may contain the session context. Once
// the executor itself supports detach, we should also detach it here.
e, ok := a.executor.(*TableReaderExecutor)
e, ok := Detach(a.executor)
if !ok {
return nil, false, nil
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil
}

// GetExecutor4Test exports the internal executor for test purpose.
func (a *recordSet) GetExecutor4Test() any {
return a.executor
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down
74 changes: 74 additions & 0 deletions pkg/executor/detach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression/contextsession"
)

// Detach detaches the current executor from the session context. After detaching, the session context
// can be used to execute another statement while this executor is still running. The returning value
// shows whether this executor is able to be detached.
//
// NOTE: the implementation of `Detach` should guarantee that no matter whether it returns true or false,
// both the original executor and the returning executor should be able to be used correctly. This restriction
// is to make sure that if `Detach(a)` returns `true`, while other children of `a`'s parent returns `false`,
// the caller can still use the original one.
func Detach(originalExecutor exec.Executor) (exec.Executor, bool) {
newExecutor, ok := originalExecutor.Detach()
if !ok {
return nil, false
}

children := originalExecutor.AllChildren()
newChildren := make([]exec.Executor, len(children))
for i, child := range children {
detached, ok := Detach(child)
if !ok {
return nil, false
}
newChildren[i] = detached
}
copy(newExecutor.AllChildren(), newChildren)

return newExecutor, true
}

func (treCtx tableReaderExecutorContext) Detach() tableReaderExecutorContext {
newCtx := treCtx

if ctx, ok := treCtx.ectx.(*contextsession.SessionExprContext); ok {
staticExprCtx := ctx.IntoStatic()

newCtx.dctx = newCtx.dctx.Detach()
newCtx.rctx = newCtx.rctx.Detach(staticExprCtx)
newCtx.buildPBCtx = newCtx.buildPBCtx.Detach(staticExprCtx)
newCtx.ectx = staticExprCtx
return newCtx
}

return treCtx
}

// Detach detaches the current executor from the session context.
func (e *TableReaderExecutor) Detach() (exec.Executor, bool) {
newExec := new(TableReaderExecutor)
*newExec = *e

newExec.tableReaderExecutorContext = newExec.tableReaderExecutorContext.Detach()

return newExec, true
}
174 changes: 174 additions & 0 deletions pkg/executor/detach_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
)

type exportExecutor interface {
GetExecutor4Test() any
}

func TestDetachAllContexts(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
oldExecutor := rs.(exportExecutor).GetExecutor4Test().(exec.Executor)

drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

require.NotEqual(t, rs, srs)
newExecutor := srs.(exportExecutor).GetExecutor4Test().(exec.Executor)

require.NotEqual(t, oldExecutor, newExecutor)
// Children should be different
for i, child := range oldExecutor.AllChildren() {
require.NotEqual(t, child, newExecutor.AllChildren()[i])
}

// Then execute another statement
tk.MustQuery("select * from t limit 1").Check(testkit.Rows("1"))
// The previous detached record set can still be used
// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))
}

func TestAfterDetachSessionCanExecute(t *testing.T) {
// This test shows that the session can be safely used to execute another statement after detaching.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int)")
for i := 0; i < 10000; i++ {
tk.MustExec("insert into t values (?)", i)
}

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

// Now, the `drs` can be used concurrently with the session.
var wg sync.WaitGroup
var stop atomic.Bool
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10000; i++ {
if stop.Load() {
return
}
tk.MustQuery("select * from t where a = ?", i).Check(testkit.Rows(strconv.Itoa(i)))
}
}()

chk := drs.NewChunk(nil)
expectedSelect := 0
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0))
expectedSelect++
}
}
stop.Store(true)
wg.Wait()
}

func TestDetachWithParam(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int primary key)")
for i := 0; i < 10000; i++ {
tk.MustExec("insert into t values (?)", i)
}

rs, err := tk.Exec("select * from t where a > ? and a < ?", 100, 200)
require.NoError(t, err)
drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

// Now, execute another statement with different size of param. It'll not affect the execution of detached executor.
var wg sync.WaitGroup
var stop atomic.Bool
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10000; i++ {
if stop.Load() {
return
}
tk.MustQuery("select * from t where a = ?", i).Check(testkit.Rows(strconv.Itoa(i)))
}
}()

chk := drs.NewChunk(nil)
expectedSelect := 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0))
expectedSelect++
}
}
stop.Store(true)
wg.Wait()
}
Loading

0 comments on commit 641b42e

Please sign in to comment.