Skip to content

Commit

Permalink
Merge #38594
Browse files Browse the repository at this point in the history
38594: opt: fetch minimal set of columns on returning mutations r=ridwanmsharif a=ridwanmsharif

Previously, we used to fetch all columns when a mutation contained
a `RETURNING` clause. This is an issue because it forces us to
retrieve unnecessary data and creates extra contention.
This change adds logic to compute the minimal set of required columns
and fetches only those.

Fixes #30618.
Unblocks #30624.

Release note: None

Co-authored-by: Ridwan Sharif <ridwan@cockroachlabs.com>
  • Loading branch information
craig[bot] and Ridwan Sharif committed Jul 16, 2019
2 parents 5dfa38c + 5a44334 commit 8ee12cb
Show file tree
Hide file tree
Showing 23 changed files with 955 additions and 192 deletions.
21 changes: 19 additions & 2 deletions pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (p *planner) Delete(
requestedCols = desc.Columns
}

// Since all columns are being returned, use the 1:1 mapping. See todo above.
rowIdxToRetIdx := mutationRowIdxToReturnIdx(requestedCols, requestedCols)

// Create the table deleter, which does the bulk of the work.
rd, err := row.MakeDeleter(
p.txn, desc, fkTables, requestedCols, row.CheckFKs, p.EvalContext(), &p.alloc,
Expand Down Expand Up @@ -175,6 +178,7 @@ func (p *planner) Delete(
td: tableDeleter{rd: rd, alloc: &p.alloc},
rowsNeeded: rowsNeeded,
fastPathInterleaved: canDeleteFastInterleaved(desc, fkTables),
rowIdxToRetIdx: rowIdxToRetIdx,
},
}

Expand Down Expand Up @@ -208,6 +212,13 @@ type deleteRun struct {

// traceKV caches the current KV tracing flag.
traceKV bool

// rowIdxToRetIdx is the mapping from the columns returned by the deleter
// to the columns in the resultRowBuffer. A value of -1 is used to indicate
// that the column at that index is not part of the resultRowBuffer
// of the mutation. Otherwise, the value at the i-th index refers to the
// index of the resultRowBuffer where the i-th column is to be returned.
rowIdxToRetIdx []int
}

// maxDeleteBatchSize is the max number of entries in the KV batch for
Expand Down Expand Up @@ -331,9 +342,15 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
// contain additional columns for every newly dropped column not
// visible. We do not want them to be available for RETURNING.
//
// d.columns is guaranteed to only contain the requested
// d.run.rows.NumCols() is guaranteed to only contain the requested
// public columns.
resultValues := sourceVals[:len(d.columns)]
resultValues := make(tree.Datums, d.run.rows.NumCols())
for i, retIdx := range d.run.rowIdxToRetIdx {
if retIdx >= 0 {
resultValues[retIdx] = sourceVals[i]
}
}

if _, err := d.run.rows.AddRow(params.ctx, resultValues); err != nil {
return err
}
Expand Down
45 changes: 32 additions & 13 deletions pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ func (p *planner) Insert(
columns = sqlbase.ResultColumnsFromColDescs(desc.Columns)
}

// Since all columns are being returned, use the 1:1 mapping.
tabColIdxToRetIdx := make([]int, len(desc.Columns))
for i := range tabColIdxToRetIdx {
tabColIdxToRetIdx[i] = i
}

// At this point, everything is ready for either an insertNode or an upserNode.

var node batchedPlanNode
Expand Down Expand Up @@ -315,8 +321,9 @@ func (p *planner) Insert(
Cols: desc.Columns,
Mapping: ri.InsertColIDtoRowIndex,
},
defaultExprs: defaultExprs,
insertCols: ri.InsertCols,
defaultExprs: defaultExprs,
insertCols: ri.InsertCols,
tabColIdxToRetIdx: tabColIdxToRetIdx,
},
}
node = in
Expand Down Expand Up @@ -368,12 +375,21 @@ type insertRun struct {
// into the row container above, when rowsNeeded is set.
resultRowBuffer tree.Datums

// rowIdxToRetIdx is the mapping from the ordering of rows in
// insertCols to the ordering in the result rows, used when
// rowIdxToTabColIdx is the mapping from the ordering of rows in
// insertCols to the ordering in the rows in the table, used when
// rowsNeeded is set to populate resultRowBuffer and the row
// container. The return index is -1 if the column for the row
// index is not public.
rowIdxToRetIdx []int
// index is not public. This is used in conjunction with tabIdxToRetIdx
// to populate the resultRowBuffer.
rowIdxToTabColIdx []int

// tabColIdxToRetIdx is the mapping from the columns in the table to the
// columns in the resultRowBuffer. A value of -1 is used to indicate
// that the table column at that index is not part of the resultRowBuffer
// of the mutation. Otherwise, the value at the i-th index refers to the
// index of the resultRowBuffer where the i-th column of the table is
// to be returned.
tabColIdxToRetIdx []int

// traceKV caches the current KV tracing flag.
traceKV bool
Expand Down Expand Up @@ -405,8 +421,8 @@ func (n *insertNode) startExec(params runParams) error {
// re-ordering the data into resultRowBuffer.
//
// Also we need to re-order the values in the source, ordered by
// insertCols, when writing them to resultRowBuffer, ordered by
// n.columns. This uses the rowIdxToRetIdx mapping.
// insertCols, when writing them to resultRowBuffer, according to
// the rowIdxToTabColIdx mapping.

n.run.resultRowBuffer = make(tree.Datums, len(n.columns))
for i := range n.run.resultRowBuffer {
Expand All @@ -419,13 +435,13 @@ func (n *insertNode) startExec(params runParams) error {
colIDToRetIndex[cols[i].ID] = i
}

n.run.rowIdxToRetIdx = make([]int, len(n.run.insertCols))
n.run.rowIdxToTabColIdx = make([]int, len(n.run.insertCols))
for i, col := range n.run.insertCols {
if idx, ok := colIDToRetIndex[col.ID]; !ok {
// Column must be write only and not public.
n.run.rowIdxToRetIdx[i] = -1
n.run.rowIdxToTabColIdx[i] = -1
} else {
n.run.rowIdxToRetIdx[i] = idx
n.run.rowIdxToTabColIdx[i] = idx
}
}
}
Expand Down Expand Up @@ -567,10 +583,13 @@ func (n *insertNode) processSourceRow(params runParams, sourceVals tree.Datums)
// The downstream consumer will want the rows in the order of
// the table descriptor, not that of insertCols. Reorder them
// and ignore non-public columns.
if idx := n.run.rowIdxToRetIdx[i]; idx >= 0 {
n.run.resultRowBuffer[idx] = val
if tabIdx := n.run.rowIdxToTabColIdx[i]; tabIdx >= 0 {
if retIdx := n.run.tabColIdxToRetIdx[tabIdx]; retIdx >= 0 {
n.run.resultRowBuffer[retIdx] = val
}
}
}

if _, err := n.run.rows.AddRow(params.ctx, n.run.resultRowBuffer); err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func (f *stubFactory) ConstructInsert(
input exec.Node,
table cat.Table,
insertCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
Expand All @@ -234,8 +234,8 @@ func (f *stubFactory) ConstructUpdate(
table cat.Table,
fetchCols exec.ColumnOrdinalSet,
updateCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand All @@ -247,14 +247,17 @@ func (f *stubFactory) ConstructUpsert(
insertCols exec.ColumnOrdinalSet,
fetchCols exec.ColumnOrdinalSet,
updateCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
) (exec.Node, error) {
return struct{}{}, nil
}

func (f *stubFactory) ConstructDelete(
input exec.Node, table cat.Table, fetchCols exec.ColumnOrdinalSet, rowsNeeded bool,
input exec.Node,
table cat.Table,
fetchCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (execPlan, error) {
tab := b.mem.Metadata().Table(ins.Table)
insertOrds := ordinalSetFromColList(ins.InsertCols)
checkOrds := ordinalSetFromColList(ins.CheckCols)
returnOrds := ordinalSetFromColList(ins.ReturnCols)
// If we planned FK checks, disable the execution code for FK checks.
disableExecFKs := len(ins.Checks) > 0
node, err := b.factory.ConstructInsert(
input.root,
tab,
insertOrds,
returnOrds,
checkOrds,
ins.NeedResults(),
disableExecFKs,
)
if err != nil {
Expand Down Expand Up @@ -106,14 +107,15 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (execPlan, error) {
tab := md.Table(upd.Table)
fetchColOrds := ordinalSetFromColList(upd.FetchCols)
updateColOrds := ordinalSetFromColList(upd.UpdateCols)
returnColOrds := ordinalSetFromColList(upd.ReturnCols)
checkOrds := ordinalSetFromColList(upd.CheckCols)
node, err := b.factory.ConstructUpdate(
input.root,
tab,
fetchColOrds,
updateColOrds,
returnColOrds,
checkOrds,
upd.NeedResults(),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -177,6 +179,7 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
insertColOrds := ordinalSetFromColList(ups.InsertCols)
fetchColOrds := ordinalSetFromColList(ups.FetchCols)
updateColOrds := ordinalSetFromColList(ups.UpdateCols)
returnColOrds := ordinalSetFromColList(ups.ReturnCols)
checkOrds := ordinalSetFromColList(ups.CheckCols)
node, err := b.factory.ConstructUpsert(
input.root,
Expand All @@ -185,8 +188,8 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
insertColOrds,
fetchColOrds,
updateColOrds,
returnColOrds,
checkOrds,
ups.NeedResults(),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -230,7 +233,8 @@ func (b *Builder) buildDelete(del *memo.DeleteExpr) (execPlan, error) {
md := b.mem.Metadata()
tab := md.Table(del.Table)
fetchColOrds := ordinalSetFromColList(del.FetchCols)
node, err := b.factory.ConstructDelete(input.root, tab, fetchColOrds, del.NeedResults())
returnColOrds := ordinalSetFromColList(del.ReturnCols)
node, err := b.factory.ConstructDelete(input.root, tab, fetchColOrds, returnColOrds)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -310,6 +314,9 @@ func appendColsWhenPresent(dst, src opt.ColList) opt.ColList {
// indicating columns that are not involved in the mutation.
func ordinalSetFromColList(colList opt.ColList) exec.ColumnOrdinalSet {
var res util.FastIntSet
if colList == nil {
return res
}
for i, col := range colList {
if col != 0 {
res.Add(i)
Expand Down
30 changes: 14 additions & 16 deletions pkg/sql/opt/exec/execbuilder/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,17 @@ COMMIT
query TTTTT colnames
EXPLAIN (VERBOSE) SELECT * FROM v
----
tree field description columns ordering
render · · (k) ·
│ render 0 k · ·
└── run · · (k, v, z) ·
└── update · · (k, v, z) ·
│ table kv · ·
│ set v · ·
│ strategy updater · ·
└── render · · (k, v, z, column7) ·
│ render 0 k · ·
│ render 1 v · ·
│ render 2 z · ·
│ render 3 444 · ·
└── scan · · (k, v, z) ·
· table kv@primary · ·
· spans /1- · ·
tree field description columns ordering
run · · (k) ·
└── update · · (k) ·
│ table kv · ·
│ set v · ·
│ strategy updater · ·
└── render · · (k, v, z, column7) ·
│ render 0 k · ·
│ render 1 v · ·
│ render 2 z · ·
│ render 3 444 · ·
└── scan · · (k, v, z) ·
· table kv@primary · ·
· spans /1- · ·
19 changes: 8 additions & 11 deletions pkg/sql/opt/exec/execbuilder/testdata/delete
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,11 @@ count · ·
query TTT
EXPLAIN DELETE FROM indexed WHERE value = 5 LIMIT 10 RETURNING id
----
render · ·
└── run · ·
└── delete · ·
│ from indexed
│ strategy deleter
└── index-join · ·
│ table indexed@primary
└── scan · ·
· table indexed@indexed_value_idx
· spans /5-/6
· limit 10
run · ·
└── delete · ·
│ from indexed
│ strategy deleter
└── scan · ·
· table indexed@indexed_value_idx
· spans /5-/6
· limit 10
28 changes: 14 additions & 14 deletions pkg/sql/opt/exec/execbuilder/testdata/insert
Original file line number Diff line number Diff line change
Expand Up @@ -480,20 +480,20 @@ CREATE TABLE xyz (x INT, y INT, z INT)
query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM [INSERT INTO xyz SELECT a, b, c FROM abc RETURNING z] ORDER BY z
----
render · · (z) +z
│ render 0 z · ·
└── run · · (x, y, z, rowid[hidden]) ·
└── insert · · (x, y, z, rowid[hidden]) ·
│ into xyz(x, y, z, rowid) · ·
│ strategy inserter · ·
└── render · · (a, b, c, column9) +c
│ render 0 a · ·
│ render 1 b · ·
│ render 2 c · ·
│ render 3 unique_rowid() · ·
└── scan · · (a, b, c) +c
· table abc@abc_c_idx · ·
· spans ALL · ·
render · · (z) +z
│ render 0 z · ·
└── run · · (z, rowid[hidden]) ·
└── insert · · (z, rowid[hidden]) ·
│ into xyz(x, y, z, rowid) · ·
│ strategy inserter · ·
└── render · · (a, b, c, column9) +c
│ render 0 a · ·
│ render 1 b · ·
│ render 2 c · ·
│ render 3 unique_rowid() · ·
└── scan · · (a, b, c) +c
· table abc@abc_c_idx · ·
· spans ALL · ·

# ------------------------------------------------------------------------------
# Regression for #35364. This tests behavior that is different between the CBO
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/opt/exec/execbuilder/testdata/orderby
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ EXPLAIN (VERBOSE) INSERT INTO t(a, b) SELECT * FROM (SELECT 1 AS x, 2 AS y) ORDE
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── insert · · (a, b, c) ·
└── run · · (a, b) ·
└── insert · · (a, b) ·
│ into t(a, b, c) · ·
│ strategy inserter · ·
└── values · · (x, y, column6) ·
Expand All @@ -496,23 +496,23 @@ render · · (b) ·
query TTTTT
EXPLAIN (VERBOSE) DELETE FROM t WHERE a = 3 RETURNING b
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── delete · · (a, b, c) ·
│ from t · ·
│ strategy deleter · ·
└── scan · · (a, b, c) ·
· table t@primary · ·
· spans /3-/3/# · ·
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b) ·
└── delete · · (a, b) ·
│ from t · ·
│ strategy deleter · ·
└── scan · · (a, b) ·
· table t@primary · ·
· spans /3-/3/# · ·

query TTTTT
EXPLAIN (VERBOSE) UPDATE t SET c = TRUE RETURNING b
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── update · · (a, b, c) ·
└── run · · (a, b) ·
└── update · · (a, b) ·
│ table t · ·
│ set c · ·
│ strategy updater · ·
Expand Down
Loading

0 comments on commit 8ee12cb

Please sign in to comment.