Skip to content

Commit

Permalink
Merge pull request #41420 from jordanlewis/backport19.2-41324
Browse files Browse the repository at this point in the history
release-19.2: sql,opt: re-enable 1pc for small delRange ops
  • Loading branch information
jordanlewis authored Oct 23, 2019
2 parents bf0301e + 662bdb1 commit 918d925
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 48 deletions.
130 changes: 91 additions & 39 deletions pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -26,9 +27,11 @@ import (
// conditions that permit the direct use of the DeleteRange kv operation,
// instead of many point deletes.
//
// Note: deleteRangeNode can't autocommit, because it has to delete in batches,
// and it won't know whether or not there is more work to do until after a batch
// is returned. This property precludes using auto commit.
// Note: deleteRangeNode can't autocommit in the general case, because it has to
// delete in batches, and it won't know whether or not there is more work to do
// until after a batch is returned. This property precludes using auto commit.
// However, if the optimizer can prove that only a small number of rows will
// be deleted, it'll enable autoCommit for delete range.
type deleteRangeNode struct {
// interleavedFastPath is true if we can take the fast path despite operating
// on an interleaved table.
Expand All @@ -41,6 +44,12 @@ type deleteRangeNode struct {
// we can count the number of rows deleted.
fetcher row.Fetcher

// autoCommitEnabled is set to true if the optimizer proved that we can safely
// use autocommit - so that the number of possible returned keys from this
// operation is low. If this is true, we won't attempt to run the delete in
// batches and will just send one big delete with a commit statement attached.
autoCommitEnabled bool

// rowCount will be set to the count of rows deleted.
rowCount int
}
Expand Down Expand Up @@ -160,48 +169,44 @@ func (d *deleteRangeNode) startExec(params runParams) error {
}
ctx := params.ctx
log.VEvent(ctx, 2, "fast delete: skipping scan")
traceKV := params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
spans := make([]roachpb.Span, len(d.spans))
copy(spans, d.spans)
for len(spans) != 0 {
b := params.p.txn.NewBatch()
for _, span := range spans {
if traceKV {
log.VEventf(ctx, 2, "DelRange %s - %s", span.Key, span.EndKey)
if !d.autoCommitEnabled {
// Without autocommit, we're going to run each batch one by one, respecting
// a max span request keys size. We use spans as a queue of spans to delete.
// It'll be edited if there are any resume spans encountered (if any request
// hits the key limit).
for len(spans) != 0 {
b := params.p.txn.NewBatch()
d.deleteSpans(params, b, spans)
b.Header.MaxSpanRequestKeys = TableTruncateChunkSize
if err := params.p.txn.Run(ctx, b); err != nil {
return err
}
b.DelRange(span.Key, span.EndKey, true /* returnKeys */)
}
b.Header.MaxSpanRequestKeys = TableTruncateChunkSize

if err := params.p.txn.Run(ctx, b); err != nil {
spans = spans[:0]
var err error
if spans, err = d.processResults(b.Results, spans); err != nil {
return err
}
}
} else {
// With autocommit, we're going to run the deleteRange in a single batch
// without a limit, since limits and deleteRange aren't compatible with 1pc
// transactions / autocommit. This isn't inherently safe, because without a
// limit, this command could technically use up unlimited memory. However,
// the optimizer only enables autoCommit if the maximum possible number of
// keys to delete in this command are low, so we're made safe.
b := params.p.txn.NewBatch()
d.deleteSpans(params, b, spans)
if err := params.p.txn.CommitInBatch(ctx, b); err != nil {
return err
}

spans = spans[:0]
for _, r := range b.Results {
var prev []byte
for _, keyBytes := range r.Keys {
// If prefix is same, don't bother decoding key.
if len(prev) > 0 && bytes.HasPrefix(keyBytes, prev) {
continue
}

after, ok, err := d.fetcher.ReadIndexKey(keyBytes)
if err != nil {
return err
}
if !ok {
return errors.AssertionFailedf("key did not match descriptor")
}
k := keyBytes[:len(keyBytes)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
d.rowCount++
}
}
if r.ResumeSpan != nil && r.ResumeSpan.Valid() {
spans = append(spans, *r.ResumeSpan)
}
if resumeSpans, err := d.processResults(b.Results, nil /* resumeSpans */); err != nil {
return err
} else if len(resumeSpans) != 0 {
// This shouldn't ever happen - we didn't pass a limit into the batch.
return errors.AssertionFailedf("deleteRange without a limit unexpectedly returned resumeSpans")
}
}

Expand All @@ -211,6 +216,53 @@ func (d *deleteRangeNode) startExec(params runParams) error {
return nil
}

// deleteSpans adds each input span to a DelRange command in the given batch.
func (d *deleteRangeNode) deleteSpans(params runParams, b *client.Batch, spans roachpb.Spans) {
ctx := params.ctx
traceKV := params.p.ExtendedEvalContext().Tracing.KVTracingEnabled()
for _, span := range spans {
if traceKV {
log.VEventf(ctx, 2, "DelRange %s - %s", span.Key, span.EndKey)
}
b.DelRange(span.Key, span.EndKey, true /* returnKeys */)
}
}

// processResults parses the results of a DelRangeResponse, incrementing the
// rowCount we're going to return for each row. If any resume spans are
// encountered during result processing, they're appended to the resumeSpans
// input parameter.
func (d *deleteRangeNode) processResults(
results []client.Result, resumeSpans []roachpb.Span,
) (roachpb.Spans, error) {
for _, r := range results {
var prev []byte
for _, keyBytes := range r.Keys {
// If prefix is same, don't bother decoding key.
if len(prev) > 0 && bytes.HasPrefix(keyBytes, prev) {
continue
}

after, ok, err := d.fetcher.ReadIndexKey(keyBytes)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.AssertionFailedf("key did not match descriptor")
}
k := keyBytes[:len(keyBytes)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
d.rowCount++
}
}
if r.ResumeSpan != nil && r.ResumeSpan.Valid() {
resumeSpans = append(resumeSpans, *r.ResumeSpan)
}
}
return resumeSpans, nil
}

// Next implements the planNode interface.
func (*deleteRangeNode) Next(params runParams) (bool, error) {
panic("invalid")
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ func (f *stubFactory) ConstructDelete(
}

func (f *stubFactory) ConstructDeleteRange(
table cat.Table, needed exec.ColumnOrdinalSet, indexConstraint *constraint.Constraint,
table cat.Table,
needed exec.ColumnOrdinalSet,
indexConstraint *constraint.Constraint,
maxReturnedKeys int,
allowAutoCommit bool,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,12 @@ func (b *Builder) buildDeleteRange(del *memo.DeleteExpr) (execPlan, error) {
scan := del.Input.(*memo.ScanExpr)
tab := b.mem.Metadata().Table(scan.Table)
needed, _ := b.getColumns(scan.Cols, scan.Table)

root, err := b.factory.ConstructDeleteRange(tab, needed, scan.Constraint)
// Calculate the maximum number of keys that the scan could return by
// multiplying the number of possible result rows by the number of column
// families of the table. The execbuilder needs this information to determine
// whether or not autoCommit can be enabled.
maxKeys := int(b.indexConstraintMaxResults(scan)) * tab.FamilyCount()
root, err := b.factory.ConstructDeleteRange(tab, needed, scan.Constraint, maxKeys, b.autoCommit)
if err != nil {
return execPlan{}, err
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/delete_range
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# LogicTest: local

statement ok
CREATE TABLE a (a INT PRIMARY KEY)

# Delete range operates in chunks of 600 (defined by sql.TableTruncateChunkSize).
statement ok
INSERT INTO a SELECT * FROM generate_series(1,1000)

statement ok
SET tracing = on,kv; DELETE FROM a; SET tracing = off

# Ensure that DelRange requests are chunked for DELETE FROM...
query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%DelRange%' OR message LIKE '%sending batch%'
----
flow DelRange /Table/53/1 - /Table/53/2
dist sender send r24: sending batch 1 DelRng to (n1,s1):1
flow DelRange /Table/53/1/601/0 - /Table/53/2
dist sender send r24: sending batch 1 DelRng to (n1,s1):1
dist sender send r24: sending batch 1 EndTxn to (n1,s1):1

# Ensure that DelRange requests are autocommitted when DELETE FROM happens on a
# chunk of fewer than 600 keys.

statement ok
INSERT INTO a VALUES(5)

statement ok
SET tracing = on,kv; DELETE FROM a WHERE a = 5; SET tracing = off

query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%DelRange%' OR message LIKE '%sending batch%'
----
flow DelRange /Table/53/1/5 - /Table/53/1/5/#
dist sender send r24: sending batch 1 DelRng, 1 EndTxn to (n1,s1):1
7 changes: 2 additions & 5 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,8 @@ type Factory interface {
// possible when certain conditions hold true (see canUseDeleteRange for more
// details). See the comment for ConstructScan for descriptions of the
// parameters, since DeleteRange combines Delete + Scan into a single operator.
ConstructDeleteRange(
table cat.Table,
needed ColumnOrdinalSet,
indexConstraint *constraint.Constraint,
) (Node, error)
ConstructDeleteRange(table cat.Table, needed ColumnOrdinalSet, indexConstraint *constraint.Constraint,
maxReturnedKeys int, allowAutoCommit bool) (Node, error)

// ConstructCreateTable returns a node that implements a CREATE TABLE
// statement.
Expand Down
25 changes: 24 additions & 1 deletion pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,11 @@ func (ef *execFactory) ConstructDelete(
}

func (ef *execFactory) ConstructDeleteRange(
table cat.Table, needed exec.ColumnOrdinalSet, indexConstraint *constraint.Constraint,
table cat.Table,
needed exec.ColumnOrdinalSet,
indexConstraint *constraint.Constraint,
maxReturnedKeys int,
allowAutoCommit bool,
) (exec.Node, error) {
tabDesc := table.(*optTable).desc
indexDesc := &tabDesc.PrimaryIndex
Expand All @@ -1805,10 +1809,29 @@ func (ef *execFactory) ConstructDeleteRange(
return nil, err
}

// Permitting autocommit in DeleteRange is very important, because DeleteRange
// is used for simple deletes from primary indexes like
// DELETE FROM t WHERE key = 1000
// When possible, we need to make this a 1pc transaction for performance
// reasons. At the same time, we have to be careful, because DeleteRange
// returns all of the keys that it deleted - so we have to set a limit on the
// DeleteRange request. But, trying to set autocommit and a limit on the
// request doesn't work properly if the limit is hit. So, we permit autocommit
// here if we can guarantee that the number of returned keys is finite and
// relatively small.
autoCommitEnabled := allowAutoCommit && ef.planner.autoCommit
// If maxReturnedKeys is 0, it indicates that we weren't able to determine
// the maximum number of returned keys, so we'll give up and not permit
// autocommit.
if maxReturnedKeys == 0 || maxReturnedKeys > TableTruncateChunkSize {
autoCommitEnabled = false
}

return &deleteRangeNode{
interleavedFastPath: false,
spans: spans,
desc: tabDesc,
autoCommitEnabled: autoCommitEnabled,
}, nil
}

Expand Down

0 comments on commit 918d925

Please sign in to comment.