Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add CTEProducer that shared by all CTEExec (#44643) #44777

Merged
82 changes: 45 additions & 37 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type executorBuilder struct {
type CTEStorages struct {
ResTbl cteutil.Storage
IterInTbl cteutil.Storage
Producer *cteProducer
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder {
Expand Down Expand Up @@ -4664,33 +4665,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}

func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
// 1. Build seedPlan.
if b.Ti != nil {
b.Ti.UseNonRecursive = true
}
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}

// 2. Build tables to store intermediate results.
chkSize := b.ctx.GetSessionVars().MaxChunkSize
tps := seedExec.base().retFieldTypes
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage

storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
return nil
}

chkSize := b.ctx.GetSessionVars().MaxChunkSize
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage
var producer *cteProducer
storages, ok := storageMap[v.CTE.IDForStorage]
if ok {
// Storage already setup.
resTbl = storages.ResTbl
iterInTbl = storages.IterInTbl
producer = storages.Producer
} else {
// Build seed part.
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
}

// Setup storages.
tps := seedExec.base().retFieldTypes
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
Expand All @@ -4702,38 +4709,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
return nil
}
storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl}
}

// 3. Build recursive part.
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
// Build recursive part.
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
}
}

var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
producer = &cteProducer{
ctx: b.ctx,
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
}
storageMap[v.CTE.IDForStorage].Producer = producer
}

return &CTEExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
chkIdx: 0,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
producer: producer,
}
}

Expand Down
Loading