Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: propagate errors from contentionQueue, catch stalls in roachtest #37199

Merged
merged 7 commits into from
Jul 8, 2019
12 changes: 6 additions & 6 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ const (
// leader lease active duration should be of the raft election timeout.
defaultRangeLeaseRaftElectionTimeoutMultiplier = 3

// defaultHeartbeatInterval is the default value of HeartbeatInterval used
// by the rpc context.
defaultHeartbeatInterval = 3 * time.Second
// defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval
// used by the rpc context.
defaultRPCHeartbeatInterval = 3 * time.Second

// rangeLeaseRenewalFraction specifies what fraction the range lease
// renewal duration should be of the range lease active time. For example,
Expand Down Expand Up @@ -181,10 +181,10 @@ type Config struct {
// See the comment in server.Config for more details.
HistogramWindowInterval time.Duration

// HeartbeatInterval controls how often a Ping request is sent on peer
// RPCHeartbeatInterval controls how often a Ping request is sent on peer
// connections to determine connection health and update the local view
// of remote clocks.
HeartbeatInterval time.Duration
RPCHeartbeatInterval time.Duration
}

func wrapError(err error) error {
Expand All @@ -207,7 +207,7 @@ func (cfg *Config) InitDefaults() {
cfg.HTTPAddr = defaultHTTPAddr
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.certificateManager = lazyCertificateManager{}
cfg.HeartbeatInterval = defaultHeartbeatInterval
cfg.RPCHeartbeatInterval = defaultRPCHeartbeatInterval
}

// HTTPRequestScheme returns "http" or "https" based on the value of Insecure.
Expand Down
6 changes: 3 additions & 3 deletions pkg/base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ const (
// for more on this setting.
DefaultMaxClockOffset = 500 * time.Millisecond

// DefaultHeartbeatInterval is how often heartbeats are sent from the
// DefaultTxnHeartbeatInterval is how often heartbeats are sent from the
// transaction coordinator to a live transaction. These keep it from
// being preempted by other transactions writing the same keys. If a
// transaction fails to be heartbeat within 2x the heartbeat interval,
// transaction fails to be heartbeat within 5x the heartbeat interval,
// it may be aborted by conflicting txns.
DefaultHeartbeatInterval = 1 * time.Second
DefaultTxnHeartbeatInterval = 1 * time.Second

// SlowRequestThreshold is the amount of time to wait before considering a
// request to be "slow".
Expand Down
9 changes: 8 additions & 1 deletion pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,19 @@ func registerKV(r *testRegistry) {
func registerKVContention(r *testRegistry) {
const nodes = 4
r.Add(testSpec{
Skip: "https://github.com/cockroachdb/cockroach/issues/36089",
Name: fmt.Sprintf("kv/contention/nodes=%d", nodes),
Cluster: makeClusterSpec(nodes + 1),
Run: func(ctx context.Context, t *test, c *cluster) {
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes))

// Start the cluster with an extremely high txn liveness threshold.
// If requests ever get stuck on a transaction that was abandoned
// then it will take 10m for them to get unstuck, at which point the
// QPS threshold check in the test is guaranteed to fail.
args := startArgs("--env=COCKROACH_TXN_LIVENESS_HEARTBEAT_MULTIPLIER=600")
c.Start(ctx, t, args, c.Range(1, nodes))

// Enable request tracing, which is a good tool for understanding
// how different transactions are interacting.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (r *Registry) LoadJobWithTxn(ctx context.Context, jobID int64, txn *client.

// DefaultCancelInterval is a reasonable interval at which to poll this node
// for liveness failures and cancel running jobs.
var DefaultCancelInterval = base.DefaultHeartbeatInterval
var DefaultCancelInterval = base.DefaultTxnHeartbeatInterval

// DefaultAdoptInterval is a reasonable interval at which to poll system.jobs
// for jobs with expired leases.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func NewTxnCoordSenderFactory(
tcf.st = cluster.MakeTestingClusterSettings()
}
if tcf.heartbeatInterval == 0 {
tcf.heartbeatInterval = base.DefaultHeartbeatInterval
tcf.heartbeatInterval = base.DefaultTxnHeartbeatInterval
}
if tcf.metrics == (TxnMetrics{}) {
tcf.metrics = MakeTxnMetrics(metric.TestSampleInterval)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func NewContext(
var cancel context.CancelFunc
ctx.masterCtx, cancel = context.WithCancel(ambient.AnnotateCtx(context.Background()))
ctx.Stopper = stopper
ctx.heartbeatInterval = baseCtx.HeartbeatInterval
ctx.heartbeatInterval = baseCtx.RPCHeartbeatInterval
ctx.RemoteClocks = newRemoteClockMonitor(
ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval)
ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func newTestServer(
func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context {
cfg := testutils.NewNodeTestBaseContext()
cfg.Insecure = true
cfg.HeartbeatInterval = 10 * time.Millisecond
cfg.RPCHeartbeatInterval = 10 * time.Millisecond
rctx := rpc.NewContext(
log.AmbientContext{Tracer: tracing.NewTracer()},
cfg,
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,4 @@ func Scan(
reply.IntentRows, err = CollectIntentRows(ctx, batch, cArgs, intents)
}
return result.FromIntents(intents, args), err

}
3 changes: 1 addition & 2 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,7 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration)
defer cancel()
defer func(old time.Duration) { txnwait.TxnLivenessThreshold = old }(txnwait.TxnLivenessThreshold)
txnwait.TxnLivenessThreshold = 2 * testutils.DefaultSucceedsSoonDuration
defer txnwait.TestingOverrideTxnLivenessThreshold(2 * testutils.DefaultSucceedsSoonDuration)

// Create a transaction that won't complete until after the merge.
txn1 := client.NewTxn(ctx, store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestFailedReplicaChange(t *testing.T) {

// The first failed replica change has laid down intents. Make sure those
// are pushable by making the transaction abandoned.
mtc.manualClock.Increment(10 * base.DefaultHeartbeatInterval.Nanoseconds())
mtc.manualClock.Increment(10 * base.DefaultTxnHeartbeatInterval.Nanoseconds())

if _, err := repl.ChangeReplicas(
context.Background(),
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/intentresolver/contention_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,26 @@ func txnID(txn *roachpb.Transaction) string {
// single intent (len(wiErr.Intents) == 1).
//
// Returns a cleanup function to be invoked by the caller after the
// original request completes, a possibly updated WriteIntentError and
// original request completes, a possibly updated WriteIntentError,
// a bool indicating whether the intent resolver should regard the
// original push / resolve as no longer applicable and skip those
// steps to retry the original request that generated the
// WriteIntentError. The cleanup function takes two arguments, a
// newWIErr, non-nil in case the re-executed request experienced
// WriteIntentError, and an error if one was encountered while
// waiting in the queue. The cleanup function takes two arguments,
// a newWIErr, non-nil in case the re-executed request experienced
// another write intent error and could not complete; and
// newIntentTxn, nil if the re-executed request left no intent, and
// non-nil if it did. At most one of these two arguments should be
// provided.
func (cq *contentionQueue) add(
ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header,
) (CleanupFunc, *roachpb.WriteIntentError, bool) {
) (CleanupFunc, *roachpb.WriteIntentError, bool, *roachpb.Error) {
if len(wiErr.Intents) != 1 {
log.Fatalf(ctx, "write intent error must contain only a single intent: %s", wiErr)
}
if hasExtremePriority(h) {
// Never queue maximum or minimum priority transactions.
return nil, wiErr, false
return nil, wiErr, false, nil
}
intent := wiErr.Intents[0]
key := string(intent.Span.Key)
Expand Down Expand Up @@ -194,6 +195,7 @@ func (cq *contentionQueue) add(

// Wait on prior pusher, if applicable.
var done bool
var pErr *roachpb.Error
if waitCh != nil {
var detectCh chan struct{}
var detectReady <-chan time.Time
Expand Down Expand Up @@ -237,7 +239,7 @@ func (cq *contentionQueue) add(

case <-ctx.Done():
// The pusher's context is done. Return without pushing.
done = true
pErr = roachpb.NewError(ctx.Err())
break Loop

case <-detectReady:
Expand Down Expand Up @@ -271,8 +273,8 @@ func (cq *contentionQueue) add(
})
log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short())
if err := cq.db.Run(ctx, b); err != nil {
log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), b.MustPErr())
done = true // done=true to avoid uselessly trying to push and resolve
pErr = b.MustPErr()
log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), pErr)
break Loop
}
// Note that this pusher may have aborted the pushee, but it
Expand Down Expand Up @@ -347,7 +349,7 @@ func (cq *contentionQueue) add(
curPusher.waitCh <- newIntentTxn
close(curPusher.waitCh)
}
}, wiErr, done
}, wiErr, done, pErr
}

func hasExtremePriority(h roachpb.Header) bool {
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,13 @@ func (ir *IntentResolver) ProcessWriteIntentError(
var cleanup func(*roachpb.WriteIntentError, *enginepb.TxnMeta)
if len(wiErr.Intents) == 1 && len(wiErr.Intents[0].Span.EndKey) == 0 {
var done bool
var pErr *roachpb.Error
// Note that the write intent error may be mutated here in the event
// that this pusher is queued to wait for a different transaction
// instead.
if cleanup, wiErr, done = ir.contentionQ.add(ctx, wiErr, h); done {
return cleanup, nil
cleanup, wiErr, done, pErr = ir.contentionQ.add(ctx, wiErr, h)
if done || pErr != nil {
return cleanup, pErr
}
}

Expand Down
68 changes: 49 additions & 19 deletions pkg/storage/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func TestContendedIntent(t *testing.T) {
{pusher: roTxn2, expTxns: []*roachpb.Transaction{roTxn1, roTxn2}},
{pusher: roTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3}},
// The fourth txn will be canceled before its predecessor is cleaned up to
// excersize the cancellation code path.
// exercise the cancellation code path.
{pusher: roTxn4, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4}},
// Now, verify that a writing txn is inserted at the end of the queue.
{pusher: rwTxn1, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1}},
Expand All @@ -417,7 +417,12 @@ func TestContendedIntent(t *testing.T) {
{pusher: rwTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1, rwTxn2, rwTxn3}},
}
var wg sync.WaitGroup
cleanupFuncs := make(chan CleanupFunc)
type intentResolverResp struct {
idx int
fn CleanupFunc
pErr *roachpb.Error
}
resps := make(chan intentResolverResp, 1)
for i, tc := range testCases {
testCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -429,14 +434,11 @@ func TestContendedIntent(t *testing.T) {
}}}
h := roachpb.Header{Txn: tc.pusher}
wg.Add(1)
go func() {
go func(idx int) {
defer wg.Done()
cleanupFunc, pErr := ir.ProcessWriteIntentError(testCtx, roachpb.NewError(wiErr), nil, h, roachpb.PUSH_ABORT)
if pErr != nil {
t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr)
}
cleanupFuncs <- cleanupFunc
wg.Done()
}()
resps <- intentResolverResp{idx: idx, fn: cleanupFunc, pErr: pErr}
}(i)
testutils.SucceedsSoon(t, func() error {
if lc, let := ir.NumContended(keyA), len(tc.expTxns); lc != let {
return errors.Errorf("expected len %d; got %d", let, lc)
Expand All @@ -461,11 +463,34 @@ func TestContendedIntent(t *testing.T) {
}

// Wait until all of the WriteIntentErrors have been processed to stop
// processing the cleanupFuncs.
go func() { wg.Wait(); close(cleanupFuncs) }()
i := 0
for f := range cleanupFuncs {
switch i {
// processing the resps.
go func() { wg.Wait(); close(resps) }()
expIdx := 0
for resp := range resps {
// Check index.
idx := resp.idx
if idx != expIdx {
t.Errorf("expected response from request %d, found %d", expIdx, idx)
}
expIdx++

// Check error.
pErr := resp.pErr
switch idx {
case 3:
// Expected to be canceled.
if !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Errorf("expected context canceled error; got %v", pErr)
}
default:
if pErr != nil {
t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr)
}
}

// Call cleanup function.
f := resp.fn
switch idx {
// The read only transactions should be cleaned up with nil, nil.
case 0:
// There should be a push of orig and then a resolve of intent.
Expand Down Expand Up @@ -497,13 +522,17 @@ func TestContendedIntent(t *testing.T) {
verifyResolveIntent(<-reqChan, keyA)
})
}
fallthrough
f(nil, nil)
case 1, 2, 3:
// The remaining roTxns should not do anything upon cleanup.
f(nil, nil)
if i == 1 {
if idx == 2 {
// Cancel request 3 before request 2 cleans up. Wait for
// it to return an error before continuing.
testCases[3].cancelFunc()
cf3 := <-resps
resps <- cf3
}
f(nil, nil)
case 4:
// Call the CleanupFunc with a new WriteIntentError with a different
// transaction. This should lean to a new push on the new transaction and
Expand All @@ -521,9 +550,10 @@ func TestContendedIntent(t *testing.T) {
Txn: rwTxn1.TxnMeta,
}}}, nil)
case 6:
f(nil, &testCases[i].pusher.TxnMeta)
f(nil, &testCases[idx].pusher.TxnMeta)
default:
t.Fatalf("unexpected response %d", idx)
}
i++
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,13 +791,16 @@ func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.Loca
// Non-state updates and actions.
// ======================

// The caller is required to detach and handle intents.
// The caller is required to detach and handle the following three fields.
if lResult.Intents != nil {
log.Fatalf(ctx, "LocalEvalResult.Intents should be nil: %+v", lResult.Intents)
}
if lResult.EndTxns != nil {
log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns)
}
if lResult.MaybeWatchForMerge {
log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false")
}

if lResult.GossipFirstRange {
// We need to run the gossip in an async task because gossiping requires
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5043,7 +5043,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
const indetCommitError = "txn in indeterminate STAGING state"

m := int64(txnwait.TxnLivenessHeartbeatMultiplier)
ns := base.DefaultHeartbeatInterval.Nanoseconds()
ns := base.DefaultTxnHeartbeatInterval.Nanoseconds()
testCases := []struct {
status roachpb.TransactionStatus // -1 for no record
heartbeatOffset int64 // nanoseconds from original timestamp, 0 for no heartbeat
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ import (

const (
// rangeIDAllocCount is the number of Range IDs to allocate per allocation.
rangeIDAllocCount = 10
defaultHeartbeatIntervalTicks = 5
rangeIDAllocCount = 10
defaultRaftHeartbeatIntervalTicks = 5

// defaultRaftEntryCacheSize is the default size in bytes for a
// store's Raft log entry cache.
Expand Down Expand Up @@ -748,7 +748,7 @@ func (sc *StoreConfig) SetDefaults() {
sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2
}
if sc.RaftHeartbeatIntervalTicks == 0 {
sc.RaftHeartbeatIntervalTicks = defaultHeartbeatIntervalTicks
sc.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks
}
if sc.RaftEntryCacheSize == 0 {
sc.RaftEntryCacheSize = defaultRaftEntryCacheSize
Expand Down
Loading