Skip to content

Commit

Permalink
executor, expression: detach projection and selection operator (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored and hawkingrei committed Aug 1, 2024
1 parent 8d42156 commit b5c7a64
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 7 deletions.
55 changes: 55 additions & 0 deletions pkg/executor/detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/contextsession"
)

Expand Down Expand Up @@ -86,6 +87,24 @@ func (iluCtx indexLookUpExecutorContext) Detach() indexLookUpExecutorContext {
return iluCtx
}

func (pCtx projectionExecutorContext) Detach() projectionExecutorContext {
newCtx := pCtx
if ctx, ok := pCtx.evalCtx.(*contextsession.SessionEvalContext); ok {
newCtx.evalCtx = ctx.IntoStatic()
}

return newCtx
}

func (sCtx selectionExecutorContext) Detach() selectionExecutorContext {
newCtx := sCtx
if ctx, ok := sCtx.evalCtx.(*contextsession.SessionEvalContext); ok {
newCtx.evalCtx = ctx.IntoStatic()
}

return newCtx
}

// Detach detaches the current executor from the session context.
func (e *TableReaderExecutor) Detach() (exec.Executor, bool) {
newExec := new(TableReaderExecutor)
Expand Down Expand Up @@ -115,3 +134,39 @@ func (e *IndexLookUpExecutor) Detach() (exec.Executor, bool) {

return newExec, true
}

// Detach detaches the current executor from the session context.
func (e *ProjectionExec) Detach() (exec.Executor, bool) {
// check whether the `Projection` requires any optional property
// Now, no optional property is copied, so if it requires any optional property, it should return false.
// TODO: some optional property can be detached. If they are implemented in the future, this check needs to be changed.
if !e.evaluatorSuit.RequiredOptionalEvalProps().IsEmpty() {
return nil, false
}

newExec := new(ProjectionExec)
*newExec = *e

newExec.projectionExecutorContext = newExec.projectionExecutorContext.Detach()

return newExec, true
}

// Detach detaches the current executor from the session context.
func (e *SelectionExec) Detach() (exec.Executor, bool) {
// check whether the `Selection` requires any optional property
// Now, no optional property is copied, so if it requires any optional property, it should return false.
// TODO: some optional property can be detached. If they are implemented in the future, this check needs to be changed.
for _, expr := range e.filters {
if !expression.GetOptionalEvalPropsForExpr(expr).IsEmpty() {
return nil, false
}
}

newExec := new(SelectionExec)
*newExec = *e

newExec.selectionExecutorContext = newExec.selectionExecutorContext.Detach()

return newExec, true
}
113 changes: 113 additions & 0 deletions pkg/executor/detach_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,116 @@ func TestDetachIndexReaderAndIndexLookUp(t *testing.T) {
}
}
}

func TestDetachSelection(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int, b int, c int, key idx_a_b (a,b), key idx_b (b))")
for i := 0; i < 10000; i++ {
tk.MustExec("insert into t values (?, ?, ?)", i, i, i)
}

tk.MustHavePlan("select a, b from t where c > 100 and c < 200", "Selection")
rs, err := tk.Exec("select a, b from t where c > ? and c < ?", 100, 200)
require.NoError(t, err)
drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

chk := drs.NewChunk(nil)
expectedSelect := 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0))
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(1))
expectedSelect++
}
}

// Selection with optional property is not allowed
tk.MustExec("set @a = 1")
tk.MustHavePlan("select a from t where a + @a > 100 and a < 200", "Selection")
rs, err = tk.Exec("select a from t where a + @a > ? and a < ?", 100, 200)
require.NoError(t, err)
drs, ok, _ = rs.(sqlexec.DetachableRecordSet).TryDetach()
require.False(t, ok)
require.Nil(t, drs)
require.NoError(t, rs.Close())
}

func TestDetachProjection(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int, b int, c int, key idx_a_b (a,b), key idx_b (b))")
for i := 0; i < 10000; i++ {
tk.MustExec("insert into t values (?, ?, ?)", i, i, i)
}

tk.MustHavePlan("select a + b from t where a > 100 and a < 200", "Projection")
rs, err := tk.Exec("select a + b from t where a > ? and a < ?", 100, 200)
require.NoError(t, err)
drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

chk := drs.NewChunk(nil)
expectedSelect := 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(2*expectedSelect), chk.GetRow(i).GetInt64(0))
expectedSelect++
}
}

// Projection with optional property is not allowed
tk.MustExec("set @a = 1")
tk.MustHavePlan("select a + @a from t where a > 100 and a < 200", "Projection")
rs, err = tk.Exec("select a + @a from t where a > ? and a < ?", 100, 200)
require.NoError(t, err)
drs, ok, _ = rs.(sqlexec.DetachableRecordSet).TryDetach()
require.False(t, ok)
require.Nil(t, drs)
require.NoError(t, rs.Close())

// Projection with Selection is also allowed
tk.MustHavePlan("select a + b from t where c > 100 and c < 200", "Projection")
tk.MustHavePlan("select a + b from t where c > 100 and c < 200", "Selection")
rs, err = tk.Exec("select a + b from t where c > ? and c < ?", 100, 200)
require.NoError(t, err)
drs, ok, err = rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

chk = drs.NewChunk(nil)
expectedSelect = 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(2*expectedSelect), chk.GetRow(i).GetInt64(0))
expectedSelect++
}
}
}
4 changes: 3 additions & 1 deletion pkg/executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type ProjectionExec struct {
parentReqRows int64

memTracker *memory.Tracker
wg sync.WaitGroup
wg *sync.WaitGroup

calculateNoDelay bool
prepared bool
Expand Down Expand Up @@ -137,6 +137,8 @@ func (e *ProjectionExec) open(_ context.Context) error {
e.memTracker.Consume(e.childResult.MemoryUsage())
}

e.wg = &sync.WaitGroup{}

return nil
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/expression/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,19 @@ func (e *defaultEvaluator) run(ctx EvalContext, vecEnabled bool, input, output *
func (e *defaultEvaluator) RequiredOptionalEvalProps() context.OptionalEvalPropKeySet {
props := context.OptionalEvalPropKeySet(0)
for _, expr := range e.exprs {
props = props | getOptionalEvalPropsForExpr(expr)
props = props | GetOptionalEvalPropsForExpr(expr)
}

return props
}

func getOptionalEvalPropsForExpr(expr Expression) context.OptionalEvalPropKeySet {
// GetOptionalEvalPropsForExpr gets all optional evaluation properties that this expression requires.
func GetOptionalEvalPropsForExpr(expr Expression) context.OptionalEvalPropKeySet {
switch e := expr.(type) {
case *ScalarFunction:
props := e.Function.RequiredOptionalEvalProps()
for _, arg := range e.GetArgs() {
props = props | getOptionalEvalPropsForExpr(arg)
props = props | GetOptionalEvalPropsForExpr(arg)
}

return props
Expand Down Expand Up @@ -157,3 +158,12 @@ func (e *EvaluatorSuite) Run(ctx EvalContext, vecEnabled bool, input, output *ch
}
return nil
}

// RequiredOptionalEvalProps exposes all optional evaluation properties that this evaluator requires.
func (e *EvaluatorSuite) RequiredOptionalEvalProps() context.OptionalEvalPropKeySet {
if e.defaultEvaluator != nil {
return e.defaultEvaluator.RequiredOptionalEvalProps()
}

return 0
}
4 changes: 2 additions & 2 deletions pkg/expression/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ func TestOptionalProp(t *testing.T) {

require.Equal(t, context.OptionalEvalPropKeySet(0), f.RequiredOptionalEvalProps())
require.Equal(t, context.OptPropCurrentUser.AsPropKeySet()|context.OptPropDDLOwnerInfo.AsPropKeySet(),
getOptionalEvalPropsForExpr(fe))
GetOptionalEvalPropsForExpr(fe))
require.Equal(t, context.OptPropCurrentUser.AsPropKeySet()|context.OptPropDDLOwnerInfo.AsPropKeySet()|
context.OptPropAdvisoryLock.AsPropKeySet(),
getOptionalEvalPropsForExpr(fe)|getOptionalEvalPropsForExpr(fe2))
GetOptionalEvalPropsForExpr(fe)|GetOptionalEvalPropsForExpr(fe2))

evalSuit := NewEvaluatorSuite([]Expression{fe, fe2}, false)
require.Equal(t, context.OptPropCurrentUser.AsPropKeySet()|context.OptPropDDLOwnerInfo.AsPropKeySet()|
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tests/cursor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 5,
shard_count = 8,
deps = [
"//pkg/config",
"//pkg/metrics",
Expand Down
Loading

0 comments on commit b5c7a64

Please sign in to comment.