Skip to content

Commit

Permalink
executor: add CTEProducer that shared by all CTEExec (#44643) (#44777)
Browse files Browse the repository at this point in the history
close #44649
  • Loading branch information
ti-chi-bot committed Oct 10, 2023
1 parent 2d27ec6 commit 55b705d
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 290 deletions.
82 changes: 45 additions & 37 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type executorBuilder struct {
type CTEStorages struct {
ResTbl cteutil.Storage
IterInTbl cteutil.Storage
Producer *cteProducer
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder {
Expand Down Expand Up @@ -4664,33 +4665,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}

func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
// 1. Build seedPlan.
if b.Ti != nil {
b.Ti.UseNonRecursive = true
}
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}

// 2. Build tables to store intermediate results.
chkSize := b.ctx.GetSessionVars().MaxChunkSize
tps := seedExec.base().retFieldTypes
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage

storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
return nil
}

chkSize := b.ctx.GetSessionVars().MaxChunkSize
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage
var producer *cteProducer
storages, ok := storageMap[v.CTE.IDForStorage]
if ok {
// Storage already setup.
resTbl = storages.ResTbl
iterInTbl = storages.IterInTbl
producer = storages.Producer
} else {
// Build seed part.
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
}

// Setup storages.
tps := seedExec.base().retFieldTypes
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
Expand All @@ -4702,38 +4709,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
return nil
}
storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl}
}

// 3. Build recursive part.
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
// Build recursive part.
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
}
}

var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
producer = &cteProducer{
ctx: b.ctx,
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
}
storageMap[v.CTE.IDForStorage].Producer = producer
}

return &CTEExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
chkIdx: 0,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
producer: producer,
}
}

Expand Down
Loading

0 comments on commit 55b705d

Please sign in to comment.