Skip to content

Commit

Permalink
Add method to detach the tableReaderExecutor from the session context
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed May 16, 2024
1 parent f0af776 commit 82d36eb
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type DistSQLContext struct {
EnabledRateLimitAction bool
EnableChunkRPC bool
OriginalSQL string
KVVars *tikvstore.Variables
KVVars tikvstore.Variables
KvExecCounter *stmtstats.KvExecCounter
SessionMemTracker *memory.Tracker

Expand Down
2 changes: 1 addition & 1 deletion pkg/distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Select(ctx context.Context, dctx *distsqlctx.DistSQLContext, kvReq *kv.Requ
option.AppendWarning = dctx.AppendWarning
}

resp := dctx.Client.Send(ctx, kvReq, dctx.KVVars, option)
resp := dctx.Client.Send(ctx, kvReq, &dctx.KVVars, option)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ go_library(
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/context",
"//pkg/expression/contextsession",
"//pkg/infoschema",
"//pkg/infoschema/context",
"//pkg/keyspace",
Expand Down
19 changes: 18 additions & 1 deletion pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
internalutil "github.com/pingcap/tidb/pkg/executor/internal/util"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/expression/contextsession"
"github.com/pingcap/tidb/pkg/infoschema"
isctx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -82,7 +83,7 @@ type tableReaderExecutorContext struct {
dctx *distsqlctx.DistSQLContext
rctx *rangerctx.RangerContext
buildPBCtx *planctx.BuildPBContext
ectx exprctx.BuildContext
ectx exprctx.ExprContext

stmtMemTracker *memory.Tracker

Expand All @@ -102,6 +103,22 @@ func (treCtx *tableReaderExecutorContext) GetDDLOwner(ctx context.Context) (*inf
return nil, errors.New("GetDDLOwner in a context without DDL")
}

// IntoStatic detaches the current context from the original session context.
//
// NOTE: For `dctx`, `rctx`... most of the fields don't need to be handled specially, because they are already copied from the session context.
// some reference types like `WarnHandler` also doesn't need to copy because a new statement will always creates a new `WarnHandler`, so it's
// safe to continue to use it here. We'll need to call `IntoStatic` method for `evalCtx` and `exprCtx`, because maybe they are implemented by
// the session context directly.
func (treCtx *tableReaderExecutorContext) IntoStatic() {
if sctx, ok := treCtx.ectx.(*contextsession.SessionExprContext); ok {
staticECtx := sctx.IntoStatic()

treCtx.rctx.IntoStatic(staticECtx)
treCtx.buildPBCtx.IntoStatic(staticECtx)
treCtx.ectx = staticECtx
}
}

func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorContext {
// Explicitly get `ownerManager` out of the closure to show that the `tableReaderExecutorContext` itself doesn't
// depend on `sctx` directly.
Expand Down
1 change: 1 addition & 0 deletions pkg/expression/contextsession/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/errctx",
"//pkg/expression/context",
"//pkg/expression/contextopt",
"//pkg/expression/contextstatic",
"//pkg/infoschema/context",
"//pkg/parser/auth",
"//pkg/parser/model",
Expand Down
86 changes: 86 additions & 0 deletions pkg/expression/contextsession/sessionctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/errctx"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/expression/contextopt"
"github.com/pingcap/tidb/pkg/expression/contextstatic"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -131,6 +132,26 @@ func (ctx *SessionExprContext) ConnectionID() uint64 {
return ctx.sctx.GetSessionVars().ConnectionID
}

// IntoStatic turns the SessionExprContext into a StaticExprContext.
func (ctx *SessionExprContext) IntoStatic() *contextstatic.StaticExprContext {
staticEvalContext := ctx.SessionEvalContext.IntoStatic()
return contextstatic.NewStaticExprContext(
contextstatic.WithEvalCtx(staticEvalContext),
contextstatic.WithCharset(ctx.GetCharsetInfo()),
contextstatic.WithDefaultCollationForUTF8MB4(ctx.GetDefaultCollationForUTF8MB4()),
contextstatic.WithBlockEncryptionMode(ctx.GetBlockEncryptionMode()),
contextstatic.WithSysDateIsNow(ctx.GetSysdateIsNow()),
contextstatic.WithNoopFuncsMode(ctx.GetNoopFuncsMode()),
contextstatic.WithRng(ctx.Rng()),
contextstatic.WithPlanCacheTracker(ctx.sctx.GetSessionVars().StmtCtx.PlanCacheTracker),
contextstatic.WithColumnIDAllocator(
exprctx.NewSimplePlanColumnIDAllocator(ctx.sctx.GetSessionVars().PlanColumnID.Load())),
contextstatic.WithConnectionID(ctx.ConnectionID()),
contextstatic.WithWindowingUseHighPrecision(ctx.GetWindowingUseHighPrecision()),
contextstatic.WithGroupConcatMaxLen(ctx.GetGroupConcatMaxLen()),
)
}

// SessionEvalContext implements the `expression.EvalContext` interface to provide evaluation context in session.
type SessionEvalContext struct {
sctx sessionctx.Context
Expand Down Expand Up @@ -273,6 +294,71 @@ func (ctx *SessionEvalContext) RequestDynamicVerification(privName string, grant
return checker.RequestDynamicVerification(ctx.sctx.GetSessionVars().ActiveRoles, privName, grantable)
}

// IntoStatic turns the SessionEvalContext into a StaticEvalContext.
func (ctx *SessionEvalContext) IntoStatic() *contextstatic.StaticEvalContext {
typeCtx := ctx.TypeCtx()
errCtx := ctx.ErrCtx()

props := make([]exprctx.OptionalEvalPropProvider, 0, exprctx.OptPropsCnt)
for i := 0; i < exprctx.OptPropsCnt; i++ {
// TODO: check whether these `prop` is safe to copy
if prop, ok := ctx.GetOptionalPropProvider(exprctx.OptionalEvalPropKey(i)); ok {
props = append(props, prop)
}
}

// TODO: use a more structural way to replace the closure.
// These closure makes sure the fields which may be changed in the execution of the next statement will not be embeded into them, to make
// sure it's safe to call them after the session continues to execute other statements.
staticCtx := contextstatic.NewStaticEvalContext(
contextstatic.WithWarnHandler(ctx.sctx.GetSessionVars().StmtCtx.WarnHandler),
contextstatic.WithSQLMode(ctx.SQLMode()),
contextstatic.WithTypeFlags(typeCtx.Flags()),
contextstatic.WithLocation(typeCtx.Location()),
contextstatic.WithErrLevelMap(errCtx.LevelMap()),
contextstatic.WithCurrentDB(ctx.CurrentDB()),
contextstatic.WithCurrentTime(func() func() (time.Time, error) {
currentTime, currentTimeErr := ctx.CurrentTime()

return func() (time.Time, error) {
return currentTime, currentTimeErr
}
}()),
contextstatic.WithMaxAllowedPacket(ctx.GetMaxAllowedPacket()),
contextstatic.WithDefaultWeekFormatMode(ctx.GetDefaultWeekFormatMode()),
contextstatic.WithDivPrecisionIncrement(ctx.GetDivPrecisionIncrement()),
contextstatic.WithPrivCheck(func() func(db string, table string, column string, priv mysql.PrivilegeType) bool {
checker := privilege.GetPrivilegeManager(ctx.sctx)
activeRoles := make([]*auth.RoleIdentity, len(ctx.sctx.GetSessionVars().ActiveRoles))
copy(activeRoles, ctx.sctx.GetSessionVars().ActiveRoles)

return func(db string, table string, column string, priv mysql.PrivilegeType) bool {
if checker == nil {
return true
}

return checker.RequestVerification(activeRoles, db, table, column, priv)
}
}()),
contextstatic.WithDynamicPrivCheck(func() func(privName string, grantable bool) bool {
checker := privilege.GetPrivilegeManager(ctx.sctx)
activeRoles := make([]*auth.RoleIdentity, len(ctx.sctx.GetSessionVars().ActiveRoles))
copy(activeRoles, ctx.sctx.GetSessionVars().ActiveRoles)

return func(privName string, grantable bool) bool {
if checker == nil {
return true
}

return checker.RequestDynamicVerification(activeRoles, privName, grantable)
}
}()),
contextstatic.WithOptionalProperty(props...),
)

return staticCtx
}

func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
if ctx != nil {
staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO()
Expand Down
2 changes: 1 addition & 1 deletion pkg/expression/contextstatic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ go_test(
],
embed = [":contextstatic"],
flaky = True,
shard_count = 9,
shard_count = 8,
deps = [
"//pkg/errctx",
"//pkg/expression/context",
Expand Down
34 changes: 9 additions & 25 deletions pkg/expression/contextstatic/exprctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
package contextstatic

import (
"sync/atomic"

exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/mathutil"
)
Expand All @@ -39,8 +38,7 @@ type staticExprCtxState struct {
sysDateIsNow bool
noopFuncsMode int
rng *mathutil.MysqlRng
canUseCache *atomic.Bool
skipCacheHandleFunc func(useCache *atomic.Bool, skipReason string)
planCacheTracker *contextutil.PlanCacheTracker
columnIDAllocator exprctx.PlanColumnIDAllocator
connectionID uint64
windowingUseHighPrecision bool
Expand Down Expand Up @@ -103,17 +101,11 @@ func WithRng(rng *mathutil.MysqlRng) StaticExprCtxOption {
}
}

// WithUseCache sets the return value of `IsUseCache` for `StaticExprContext`.
func WithUseCache(useCache bool) StaticExprCtxOption {
// WithPlanCacheTracker sets the plan cache tracker for `StaticExprContext`.
func WithPlanCacheTracker(tracker *contextutil.PlanCacheTracker) StaticExprCtxOption {
intest.AssertNotNil(tracker)
return func(s *staticExprCtxState) {
s.canUseCache.Store(useCache)
}
}

// WithSkipCacheHandleFunc sets inner skip plan cache function for StaticExprContext
func WithSkipCacheHandleFunc(fn func(useCache *atomic.Bool, skipReason string)) StaticExprCtxOption {
return func(s *staticExprCtxState) {
s.skipCacheHandleFunc = fn
s.planCacheTracker = tracker
}
}

Expand Down Expand Up @@ -171,8 +163,7 @@ func NewStaticExprContext(opts ...StaticExprCtxOption) *StaticExprContext {
},
}

ctx.canUseCache = &atomic.Bool{}
ctx.canUseCache.Store(true)
ctx.planCacheTracker = contextutil.NewPlanCacheTracker(ctx.evalCtx)

for _, opt := range opts {
opt(&ctx.staticExprCtxState)
Expand All @@ -199,9 +190,6 @@ func (ctx *StaticExprContext) Apply(opts ...StaticExprCtxOption) *StaticExprCont
staticExprCtxState: ctx.staticExprCtxState,
}

newCtx.canUseCache = &atomic.Bool{}
newCtx.canUseCache.Store(ctx.canUseCache.Load())

for _, opt := range opts {
opt(&newCtx.staticExprCtxState)
}
Expand Down Expand Up @@ -246,16 +234,12 @@ func (ctx *StaticExprContext) Rng() *mathutil.MysqlRng {

// IsUseCache implements the `ExprContext.IsUseCache`.
func (ctx *StaticExprContext) IsUseCache() bool {
return ctx.canUseCache.Load()
return ctx.planCacheTracker.UseCache()
}

// SetSkipPlanCache implements the `ExprContext.SetSkipPlanCache`.
func (ctx *StaticExprContext) SetSkipPlanCache(reason string) {
if fn := ctx.skipCacheHandleFunc; fn != nil {
fn(ctx.canUseCache, reason)
return
}
ctx.canUseCache.Store(false)
ctx.planCacheTracker.SetSkipPlanCache(reason)
}

// AllocPlanColumnID implements the `ExprContext.AllocPlanColumnID`.
Expand Down
Loading

0 comments on commit 82d36eb

Please sign in to comment.