Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.3: colexec: harden eager cancellation in parallel unordered sync #134609

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ go_library(
"//pkg/sql/sqltelemetry", # keep
"//pkg/sql/types",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/intsets",
Expand Down
51 changes: 47 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package colexec
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -171,7 +173,9 @@ func NewParallelUnorderedSynchronizer(
// be able to distinguish the benign context cancellation error from a
// true query execution error, so it can "poison" the query execution if
// the child sync hasn't transitioned into the draining mode when we
// perform the eager cancellation.
// perform the eager cancellation. The child sync also won't distinguish
// between the benign context cancellation and the flow cancellation, so
// it might not collect the metadata from its inputs when it should.
allowEagerCancellationOnDrain = true
for _, input := range inputs {
if hasParallelUnorderedSync(input.Root) {
Expand Down Expand Up @@ -462,6 +466,45 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
}
}

bufferMeta := func(meta []execinfrapb.ProducerMetadata) {
if !s.flowCtx.Local {
s.bufferedMeta = append(s.bufferedMeta, meta...)
return
}
// Given that the synchronizer is draining, it is safe to ignore all
// context cancellation errors in the metadata for local plans. This is
// the case because:
// - if the query should result in an error, then some other error was
// already propagated to the client, and this was the reason for why we
// transitioned into draining;
// - if the query should be successful, yet we have some pending context
// cancellation errors, then it must be the case that query execution
// was short-circuited (e.g. because of the LIMIT), so we can pretend
// the part of the execution that hit the pending error didn't happen
// since clearly it wasn't necessary to compute the query result.
//
// Note that we cannot ignore all errors here since some of them (like
// ReadWithinUncertaintyIntervalError) could poison the txn and need to
// be propagated to the client, so we only swallow the cancellation
// errors here.
// TODO(yuzefovich): the txn could be poisoned even by errors that we're
// swallowing. I think we could (and perhaps should) swallow all errors
// here.
for _, m := range meta {
if m.Err == nil ||
// This is ugly, but the context cancellation if observed in the
// KV layer can result in kvpb errors that don't satisfy
// errors.Is check (because they don't serialize the original
// error), so we have this string matching instead.
(!strings.Contains(m.Err.Error(), context.Canceled.Error()) &&
// If the cancellation is observed by the CancelChecker,
// then it propagates a QueryCanceledError.
!errors.Is(m.Err, cancelchecker.QueryCanceledError)) {
s.bufferedMeta = append(s.bufferedMeta, m)
}
}
}

// Non-blocking drain of batchCh. This is important mostly because of the
// following edge case: all n inputs have pushed batches to the batchCh, so
// there are currently n messages. Next notifies the last read input to
Expand All @@ -478,7 +521,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if msg == nil {
batchChDrained = true
} else if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
default:
batchChDrained = true
Expand All @@ -497,15 +540,15 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
// Drain the batchCh, this reads the metadata that was pushed.
for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh {
if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
}

// Buffer any errors that may have happened without blocking on the channel.
for exitLoop := false; !exitLoop; {
select {
case err := <-s.errCh:
s.bufferedMeta = append(s.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
bufferMeta([]execinfrapb.ProducerMetadata{{Err: err}})
default:
exitLoop = true
}
Expand Down
39 changes: 37 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/union
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,50 @@ CREATE TABLE t127043_2 (k2 INT, v2 INT, INDEX (k2));
INSERT INTO t127043_2 VALUES (1, 1);
CREATE TABLE t127043_3 (k3 INT, v3 INT, INDEX (k3));
INSERT INTO t127043_3 VALUES (1, 1);
CREATE VIEW v127043 (k, v) AS
CREATE VIEW v127043_3 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3;
CREATE VIEW v127043_3_idx (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1@t127043_1_k1_idx
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2@t127043_2_k2_idx
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3@t127043_3_k3_idx;
CREATE VIEW v127043_2 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2;

statement ok
ANALYZE t127043_1;

statement ok
ANALYZE t127043_2;

statement ok
ANALYZE t127043_3;

# Scan and filter in all UNION branches.
query II
SELECT k, v FROM v127043_3 WHERE k = 1 LIMIT 1;
----
1 1

# Scan and index join in all UNION branches.
query II
SELECT k, v FROM v127043_3_idx WHERE k = 1 LIMIT 1;
----
1 1

# Hash join with two UNION branches (with a scan and an index join) on one side
# and a scan on the other.
query II
SELECT k, v FROM v127043 WHERE k = 1 LIMIT 1;
SELECT k, v FROM v127043_2 INNER JOIN t127043_3 ON k = k3 WHERE k = 1 LIMIT 1;
----
1 1