Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
37199: storage: propagate errors from contentionQueue, catch stalls in roachtest r=nvanbenschoten a=nvanbenschoten

Informs #36089.

The PR is split into a series of commits. The first fixes part of a bug that was causing #36089  to fail (thanks to #36748) and the second improves the test to have a more obvious failure condition for this class of bug in the future. The third, fifth, and sixth clean up code. Finally, the fourth fixes another bug that could cause issues with #36089.

Before the first commit, requests could get stuck repeatedly attempting to push a transaction only to repeatedly find that they themselves were already aborted. The error would not propagate up to the transaction coordinator and the request would get stuck. This commit fixes this behavior by correctly propagating errors observed by the `contentionQueue`.

The second commit bumps the TxnLivenessThreshold for clusters running `kv/contention/nodes=4` to 10 minutes. This is sufficiently large such that if at any point a transaction is abandoned then all other transactions will begin waiting for it, throughput will drop to 0 for 10 straight minutes, and the test will fail to achieve its minimum QPS requirement.

The fourth commit instructs pushers in the `txnwait.Queue` to inform all other pushers that are waiting for the same transaction when it observes an ABORTED transaction. I never saw this cause issues with #36089, but it seems very possible that it could given frequent tscache rotations.

38397: sql: deflake TestLogic//crdb_internal/max_retry_counter r=knz a=knz

Fixes #38062.

Release note: None

38654: exec: Handle NULLS in TopK sorter r=rohany a=rohany

This commit fixes NULLs in the TopK sorter by avoiding use
of the vec copy method, which has a bug. Instead, we add
a set method to the vec comparator, and use the templatized
comparator to perform the sets that the TopK sorter needs.

To facilitate this, we add an UnsetNull method to the Nulls
object. However, use of this method results in HasNull()
maybe returning true even if the vector doesn't have nulls.
This behavior already occurs when selection vectors are used.
Based on discussions with @solongordon and @asubiotto, this behavior
is OK, and future PR's will attempt to make this behavior better, and address
the bugs within the Vec Copy method.

38725: cli/dump: more clearly inform the user upon tables with no visible columns r=knz a=knz

Informs #37768.
Informs #28948.
This is coming up quite often on support, lately again on gitter and forum https://forum.cockroachlabs.com/t/error-while-dumping-core-backup/2901/3.

This PR aims to lessen the burden on support and propose a clear "next action" for the user.

Before:

```
kena@kenax ~/cockroach % ./cockroach dump --insecure defaultdb
CREATE TABLE t (,
        FAMILY "primary" (rowid)
);
Error: pq: at or near "from": syntax error
Failed running "dump"
```

After:

```
kena@kenax ~/cockroach % ./cockroach dump --insecure defaultdb
CREATE TABLE t (,
        FAMILY "primary" (rowid)
);
Error: table "defaultdb.public.t" has no visible columns
HINT: To proceed with the dump, either omit this table from the list of tables to dump, drop the table, or add some visible columns.
--
See: #37768
Failed running "dump"
```

Release note (cli change): `cockroach dump` will now more clearly
refer to issue #37768 when it encounters a table with no visible
columns, which (currently) cannot be dumped successfully.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
Co-authored-by: Rohan Yadav <rohany@alumni.cmu.edu>
  • Loading branch information
4 people committed Jul 8, 2019
5 parents ff1faaa + 82aeef4 + 6d95a15 + 3c0aa0b + d4b257b commit 3798cc8
Show file tree
Hide file tree
Showing 30 changed files with 458 additions and 140 deletions.
12 changes: 6 additions & 6 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ const (
// leader lease active duration should be of the raft election timeout.
defaultRangeLeaseRaftElectionTimeoutMultiplier = 3

// defaultHeartbeatInterval is the default value of HeartbeatInterval used
// by the rpc context.
defaultHeartbeatInterval = 3 * time.Second
// defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval
// used by the rpc context.
defaultRPCHeartbeatInterval = 3 * time.Second

// rangeLeaseRenewalFraction specifies what fraction the range lease
// renewal duration should be of the range lease active time. For example,
Expand Down Expand Up @@ -181,10 +181,10 @@ type Config struct {
// See the comment in server.Config for more details.
HistogramWindowInterval time.Duration

// HeartbeatInterval controls how often a Ping request is sent on peer
// RPCHeartbeatInterval controls how often a Ping request is sent on peer
// connections to determine connection health and update the local view
// of remote clocks.
HeartbeatInterval time.Duration
RPCHeartbeatInterval time.Duration
}

func wrapError(err error) error {
Expand All @@ -207,7 +207,7 @@ func (cfg *Config) InitDefaults() {
cfg.HTTPAddr = defaultHTTPAddr
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.certificateManager = lazyCertificateManager{}
cfg.HeartbeatInterval = defaultHeartbeatInterval
cfg.RPCHeartbeatInterval = defaultRPCHeartbeatInterval
}

// HTTPRequestScheme returns "http" or "https" based on the value of Insecure.
Expand Down
6 changes: 3 additions & 3 deletions pkg/base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ const (
// for more on this setting.
DefaultMaxClockOffset = 500 * time.Millisecond

// DefaultHeartbeatInterval is how often heartbeats are sent from the
// DefaultTxnHeartbeatInterval is how often heartbeats are sent from the
// transaction coordinator to a live transaction. These keep it from
// being preempted by other transactions writing the same keys. If a
// transaction fails to be heartbeat within 2x the heartbeat interval,
// transaction fails to be heartbeat within 5x the heartbeat interval,
// it may be aborted by conflicting txns.
DefaultHeartbeatInterval = 1 * time.Second
DefaultTxnHeartbeatInterval = 1 * time.Second

// SlowRequestThreshold is the amount of time to wait before considering a
// request to be "slow".
Expand Down
12 changes: 12 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ func Main() {

errCode := 0
if err := Run(os.Args[1:]); err != nil {
// Display the error and its details/hints.
fmt.Fprintln(stderr, "Error:", err.Error())
maybeShowErrorDetails(stderr, err, false /* printNewline */)

// Remind the user of which command was being run.
fmt.Fprintf(stderr, "Failed running %q\n", cmdName)

// Finally, extract the error code, as optionally specified
// by the sub-command.
errCode = 1
var cliErr *cliError
if errors.As(err, &cliErr) {
Expand Down Expand Up @@ -158,6 +166,10 @@ var cockroachCmd = &cobra.Command{
// Commands should manually print usage information when the error is,
// in fact, a result of a bad invocation, e.g. too many arguments.
SilenceUsage: true,
// Disable automatic printing of the error. We want to also print
// details and hints, which cobra does not do for us. Instead
// we do the printing in Main().
SilenceErrors: true,
}

func init() {
Expand Down
28 changes: 26 additions & 2 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func (c cliTest) runWithArgsUnredirected(origArgs []string) {
return Run(args)
}(); err != nil {
fmt.Println(err)
maybeShowErrorDetails(os.Stdout, err, false /*printNewLine*/)
}
}

Expand Down Expand Up @@ -1857,8 +1858,11 @@ Use "cockroach [command] --help" for more information about a command.
done <- err
}()

if err := Run(test.flags); err != nil && !test.expErr {
t.Error(err)
if err := Run(test.flags); err != nil {
fmt.Fprintln(w, "Error:", err)
if !test.expErr {
t.Error(err)
}
}

// back to normal state
Expand Down Expand Up @@ -2552,3 +2556,23 @@ func Example_sqlfmt() {
// sqlfmt --no-simplify -e select (1+2)+3
// SELECT (1 + 2) + 3
}

func Example_dump_no_visible_columns() {
c := newCLITest(cliTestParams{})
defer c.cleanup()

c.RunWithArgs([]string{"sql", "-e", "create table t(x int); set sql_safe_updates=false; alter table t drop x"})
c.RunWithArgs([]string{"dump", "defaultdb"})

// Output:
// sql -e create table t(x int); set sql_safe_updates=false; alter table t drop x
// ALTER TABLE
// dump defaultdb
// CREATE TABLE t (,
// FAMILY "primary" (rowid)
// );
// table "defaultdb.public.t" has no visible columns
// HINT: To proceed with the dump, either omit this table from the list of tables to dump, drop the table, or add some visible columns.
// --
// See: https://github.com/cockroachdb/cockroach/issues/37768
}
16 changes: 15 additions & 1 deletion pkg/cli/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/pkg/errors"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -536,6 +536,20 @@ func dumpTableData(w io.Writer, conn *sqlConn, clusterTS string, bmd basicMetada
return err
}

if strings.TrimSpace(md.columnNames) == "" {
// A table with no columns may still have one or more rows. In
// fact, it can have arbitrary many rows, each with a different
// (hidden) PK value. Unfortunately, the dump command today simply
// omits the hidden PKs from the dump, so it is not possible to
// restore the invisible values.
// Instead of failing with an incomprehensible error below, inform
// the user more clearly.
err := errors.Newf("table %q has no visible columns", tree.ErrString(bmd.name))
err = errors.WithHint(err, "To proceed with the dump, either omit this table from the list of tables to dump, drop the table, or add some visible columns.")
err = errors.WithIssueLink(err, errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/37768"})
return err
}

bs := fmt.Sprintf("SELECT %s FROM %s AS OF SYSTEM TIME %s ORDER BY PRIMARY KEY %[2]s",
md.columnNames,
md.name,
Expand Down
9 changes: 8 additions & 1 deletion pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,19 @@ func registerKV(r *testRegistry) {
func registerKVContention(r *testRegistry) {
const nodes = 4
r.Add(testSpec{
Skip: "https://github.com/cockroachdb/cockroach/issues/36089",
Name: fmt.Sprintf("kv/contention/nodes=%d", nodes),
Cluster: makeClusterSpec(nodes + 1),
Run: func(ctx context.Context, t *test, c *cluster) {
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes))

// Start the cluster with an extremely high txn liveness threshold.
// If requests ever get stuck on a transaction that was abandoned
// then it will take 10m for them to get unstuck, at which point the
// QPS threshold check in the test is guaranteed to fail.
args := startArgs("--env=COCKROACH_TXN_LIVENESS_HEARTBEAT_MULTIPLIER=600")
c.Start(ctx, t, args, c.Range(1, nodes))

// Enable request tracing, which is a good tool for understanding
// how different transactions are interacting.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (r *Registry) LoadJobWithTxn(ctx context.Context, jobID int64, txn *client.

// DefaultCancelInterval is a reasonable interval at which to poll this node
// for liveness failures and cancel running jobs.
var DefaultCancelInterval = base.DefaultHeartbeatInterval
var DefaultCancelInterval = base.DefaultTxnHeartbeatInterval

// DefaultAdoptInterval is a reasonable interval at which to poll system.jobs
// for jobs with expired leases.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func NewTxnCoordSenderFactory(
tcf.st = cluster.MakeTestingClusterSettings()
}
if tcf.heartbeatInterval == 0 {
tcf.heartbeatInterval = base.DefaultHeartbeatInterval
tcf.heartbeatInterval = base.DefaultTxnHeartbeatInterval
}
if tcf.metrics == (TxnMetrics{}) {
tcf.metrics = MakeTxnMetrics(metric.TestSampleInterval)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func NewContext(
var cancel context.CancelFunc
ctx.masterCtx, cancel = context.WithCancel(ambient.AnnotateCtx(context.Background()))
ctx.Stopper = stopper
ctx.heartbeatInterval = baseCtx.HeartbeatInterval
ctx.heartbeatInterval = baseCtx.RPCHeartbeatInterval
ctx.RemoteClocks = newRemoteClockMonitor(
ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval)
ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func newTestServer(
func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context {
cfg := testutils.NewNodeTestBaseContext()
cfg.Insecure = true
cfg.HeartbeatInterval = 10 * time.Millisecond
cfg.RPCHeartbeatInterval = 10 * time.Millisecond
rctx := rpc.NewContext(
log.AmbientContext{Tracer: tracing.NewTracer()},
cfg,
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/exec/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (n *Nulls) SetNull(i uint16) {
n.SetNull64(uint64(i))
}

// UnsetNull unsets the ith value of the column.
func (n *Nulls) UnsetNull(i uint16) {
n.UnsetNull64(uint64(i))
}

// SetNullRange sets all the values in [start, end) to null.
func (n *Nulls) SetNullRange(start uint64, end uint64) {
if start >= end {
Expand Down Expand Up @@ -146,6 +151,11 @@ func (n *Nulls) SetNull64(i uint64) {
n.nulls[i/8] &= flippedBitMask[i%8]
}

// UnsetNull64 unsets the ith values of the column.
func (n *Nulls) UnsetNull64(i uint64) {
n.nulls[i/8] |= bitMask[i%8]
}

// Extend extends the nulls vector with the next toAppend values from src,
// starting at srcStartIdx.
func (n *Nulls) Extend(src *Nulls, destStartIdx uint64, srcStartIdx uint16, toAppend uint16) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/exec/coldata/nulls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func TestSetAndUnsetNulls(t *testing.T) {
for i := uint16(0); i < BatchSize; i++ {
require.True(t, n.NullAt(i))
}

for i := uint16(0); i < BatchSize; i += 3 {
n.UnsetNull(i)
}
for i := uint16(0); i < BatchSize; i++ {
if i%3 == 0 {
require.False(t, n.NullAt(i))
} else {
require.True(t, n.NullAt(i))
}
}

n.UnsetNulls()
for i := uint16(0); i < BatchSize; i++ {
require.False(t, n.NullAt(i))
Expand Down
27 changes: 7 additions & 20 deletions pkg/sql/exec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ type topKSorter struct {
func (t *topKSorter) Init() {
t.input.Init()
t.topK = coldata.NewMemBatchWithSize(t.inputTypes, int(t.k))
t.comparators = make([]vecComparator, len(t.orderingCols))
for i := range t.orderingCols {
typ := t.inputTypes[t.orderingCols[i].ColIdx]
// one vec for output batch and one for current input batch
t.comparators = make([]vecComparator, len(t.inputTypes))
for i := range t.inputTypes {
typ := t.inputTypes[i]
t.comparators[i] = GetVecComparator(typ, 2)
}
t.output = coldata.NewMemBatchWithSize(t.inputTypes, coldata.BatchSize)
Expand Down Expand Up @@ -161,18 +160,7 @@ func (t *topKSorter) spool(ctx context.Context) {
maxIdx := t.heap[0]
if t.compareRow(inputVecIdx, topKVecIdx, idx, maxIdx) < 0 {
for j := range t.inputTypes {
// TODO(solon): Make this copy more efficient, perhaps by adding a
// copy method to the vecComparator interface. This would avoid
// needing to switch on the column type every time.
t.topK.ColVec(j).Copy(
coldata.CopyArgs{
ColType: t.inputTypes[j],
Src: inputBatch.ColVec(j),
DestIdx: uint64(maxIdx),
SrcStartIdx: uint64(idx),
SrcEndIdx: uint64(idx + 1),
},
)
t.comparators[j].set(inputVecIdx, topKVecIdx, idx, maxIdx)
}
heap.Fix(t, 0)
}
Expand Down Expand Up @@ -221,7 +209,7 @@ func (t *topKSorter) emit() coldata.Batch {
func (t *topKSorter) compareRow(vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 uint16) int {
for i := range t.orderingCols {
info := t.orderingCols[i]
res := t.comparators[i].compare(vecIdx1, vecIdx2, rowIdx1, rowIdx2)
res := t.comparators[info.ColIdx].compare(vecIdx1, vecIdx2, rowIdx1, rowIdx2)
if res != 0 {
switch d := info.Direction; d {
case distsqlpb.Ordering_Column_ASC:
Expand All @@ -237,9 +225,8 @@ func (t *topKSorter) compareRow(vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 uint16) i
}

func (t *topKSorter) updateComparators(vecIdx int, batch coldata.Batch) {
for i := range t.orderingCols {
vec := batch.ColVec(int(t.orderingCols[i].ColIdx))
t.comparators[i].setVec(vecIdx, vec)
for i := range t.inputTypes {
t.comparators[i].setVec(vecIdx, batch.ColVec(i))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec/sorttopk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestTopKSorter(t *testing.T) {
},
{
name: "nulls",
tuples: tuples{{1}, {2}, {3}, {4}, {5}, {6}, {7}, {nil}},
expected: tuples{{nil}, {1}, {2}},
tuples: tuples{{1}, {2}, {nil}, {3}, {4}, {5}, {6}, {7}, {nil}},
expected: tuples{{nil}, {nil}, {1}},
typ: []types.T{types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0}},
k: 3,
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec/vec_comparators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type vecComparator interface {
// 0, or 1.
compare(vecIdx1, vecIdx2 int, valIdx1, valIdx2 uint16) int

// set sets the value of the vector at dstVecIdx at index dstValIdx to the value
// at the vector at srcVecIdx at index srcValIdx.
set(srcVecIdx, dstVecIdx int, srcValIdx, dstValIdx uint16)

// setVec updates the vector at idx.
setVec(idx int, vec coldata.Vec)
}
9 changes: 9 additions & 0 deletions pkg/sql/exec/vec_comparators_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (c *_TYPEVecComparator) setVec(idx int, vec coldata.Vec) {
c.nulls[idx] = vec.Nulls()
}

func (c *_TYPEVecComparator) set(srcVecIdx, dstVecIdx int, srcIdx, dstIdx uint16) {
if c.nulls[srcVecIdx].HasNulls() && c.nulls[srcVecIdx].NullAt(srcIdx) {
c.nulls[dstVecIdx].SetNull(dstIdx)
} else {
c.nulls[dstVecIdx].UnsetNull(dstIdx)
c.vecs[dstVecIdx][dstIdx] = c.vecs[srcVecIdx][srcIdx]
}
}

// {{end}}

func GetVecComparator(t types.T, numVecs int) vecComparator {
Expand Down
Loading

0 comments on commit 3798cc8

Please sign in to comment.