Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
45595: sql: make cleanup jobs spawned by alter primary key not cancelable r=spaskob a=rohany

Fixes cockroachdb#45500.

This PR makes the job spawned by ALTER PRIMARY KEY that cleans
up indexes be uncancelable.

This PR additionally fixes a related bug where ALTER PRIMARY KEY
would spawn a job when it didn't actually enqueue any mutations
on the table descriptor, causing future schema changes on the
table to hang.

Release note (sql change): This PR makes the cleanup job spawned
by ALTER PRIMARY KEY in some cases uncancelable.

45603: storage/txnwait: terminate push when pusher aborted at lower epoch r=nvanbenschoten a=nvanbenschoten

Closes cockroachdb#40786.
Closes cockroachdb#44336.

This commit resolves a bug in distributed deadlock detection that would
allow a deadlock between transactions to go undetected, stalling the
workload indefinitely.

The issue materialized as follows:
1. two transactions would deadlock and each enter a txnwait queue
2. they would poll their pushees record along with their own
3. deadlock detection would eventually pick this up and abort one of the txns
   using the pusher's copy of the txn's proto
4. however, the aborted txn has since restarted and bumped it epoch
5. the aborted txn continued to query its record, but failed to ingest any
   updates from it because the record was at a lower epoch than its own
   copy of its txn proto. So it never noticed that it was ABORTED
6. all other txns in the system including the original contending txn
   piled up behind the aborted txn in the contention queue, waiting for
   it to notice it was aborted and exit the queue
7. deadlock!

I'm optimistically closing the two `kv/contention/nodes=4` issues both
because I hope this is the cause of their recent troubles and also because
I've been spending a lot of time with the test recently in light of cockroachdb#45482
and plan to stabilize it fully.

I plan to backport this to release-19.2. This doesn't need to go all the
way back to release-19.1 because this was introduces in aed892a.

Co-authored-by: Rohan Yadav <rohany@alumni.cmu.edu>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Mar 3, 2020
3 parents ada086e + 3d7aeb3 + 0a1f251 commit aeb41dc
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 317 deletions.
8 changes: 8 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type Record struct {
Details jobspb.Details
Progress jobspb.ProgressDetails
RunningStatus RunningStatus
// NonCancelable is used to denote when a job cannot be canceled. This field
// will not be respected in mixed version clusters where some nodes have
// a version < 20.1, so it can only be used in cases where all nodes having
// versions >= 20.1 is guaranteed.
NonCancelable bool
}

// StartableJob is a job created with a transaction to be started later.
Expand Down Expand Up @@ -375,6 +380,9 @@ func (j *Job) cancelRequested(
ctx context.Context, fn func(context.Context, *client.Txn) error,
) error {
return j.Update(ctx, func(txn *client.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Payload.Noncancelable {
return errors.Newf("job %d: not cancelable", j.ID())
}
if md.Status == StatusCancelRequested || md.Status == StatusCanceled {
return nil
}
Expand Down
485 changes: 263 additions & 222 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ message Payload {
// needed.
errorspb.EncodedError final_resume_error = 19;
Lease lease = 9;
// Noncancelable is used to denote when a job cannot be canceled. This field
// will not be respected in mixed version clusters where some nodes have
// a version < 20.1, so it can only be used in cases where all nodes having
// versions >= 20.1 is guaranteed.
bool noncancelable = 20;
oneof details {
BackupDetails backup = 10;
RestoreDetails restore = 11;
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (r *Registry) NewJob(record Record) *Job {
Username: record.Username,
DescriptorIDs: record.DescriptorIDs,
Details: jobspb.WrapPayloadDetails(record.Details),
Noncancelable: record.NonCancelable,
}
job.mu.progress = jobspb.Progress{
Details: jobspb.WrapProgressDetails(record.Progress),
Expand Down
11 changes: 9 additions & 2 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,15 @@ func (t *Transaction) Update(o *Transaction) {
t.IgnoredSeqNums = o.IgnoredSeqNums
}
} else /* t.Epoch > o.Epoch */ {
// Ignore epoch-specific state from previous epoch.
if o.Status == COMMITTED {
// Ignore epoch-specific state from previous epoch. However, ensure that
// the transaction status still makes sense.
switch o.Status {
case ABORTED:
// Once aborted, always aborted. The transaction coordinator might
// have incremented the txn's epoch without realizing that it was
// aborted.
t.Status = ABORTED
case COMMITTED:
log.Warningf(context.Background(), "updating txn %s with COMMITTED txn at earlier epoch %s", t.String(), o.String())
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,25 @@ func TestTransactionUpdateStaging(t *testing.T) {
}
}

// TestTransactionUpdateAbortedOldEpoch tests that Transaction.Update propagates
// an ABORTED status even when that status comes from a proto with an old epoch.
// Once a transaction is ABORTED, it will stay aborted, even if its coordinator
// doesn't know this at the time that it increments its epoch and retries.
func TestTransactionUpdateAbortedOldEpoch(t *testing.T) {
txn := nonZeroTxn
txn.Status = ABORTED

txnRestart := txn
txnRestart.Epoch++
txnRestart.Status = PENDING
txnRestart.Update(&txn)

expTxn := txn
expTxn.Epoch++
expTxn.Status = ABORTED
require.Equal(t, expTxn, txnRestart)
}

func TestTransactionClone(t *testing.T) {
txnPtr := nonZeroTxn.Clone()
txn := *txnPtr
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_primary_key
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,32 @@ t CREATE TABLE t (
CONSTRAINT "primary" PRIMARY KEY (x ASC),
FAMILY "primary" (x, rowid)
)

# Ensure that we cannot cancel the index cleanup jobs spawned by
# a primary key change.
statement ok
DROP TABLE IF EXISTS t;
CREATE TABLE t (x INT PRIMARY KEY, y INT NOT NULL);
ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (y)

statement error pq: job [0-9]*: not cancelable
CANCEL JOB (
SELECT job_id FROM [SHOW JOBS] WHERE
description = 'CLEANUP JOB for ''ALTER TABLE test.public.t ALTER PRIMARY KEY USING COLUMNS (y)''' AND
status = 'running'
)

# Ensure that starting a primary key change that does not
# enqueue any mutations doesn't start a job.
# TODO (rohany): This test might become obselete when #44923 is fixed.
statement ok
DROP TABLE IF EXISTS t;
CREATE TABLE t (x INT NOT NULL);
ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (x)

query I
SELECT job_id FROM [SHOW JOBS] WHERE
description = 'CLEANUP JOB for ''ALTER TABLE test.public.t ALTER PRIMARY KEY USING COLUMNS (y)''' AND
status = 'running'
----

35 changes: 20 additions & 15 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}

// If we performed MakeMutationComplete on a PrimaryKeySwap mutation, then we need to start
// a job for the index deletion mutations that the primary key swap mutation added, if any.
// a job for the index deletion mutations that the primary key swap mutation added.
mutationID := scDesc.ClusterVersion.NextMutationID
span := scDesc.PrimaryIndexSpan()
var spanList []jobspb.ResumeSpanList
Expand All @@ -1425,21 +1425,26 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
},
)
}
jobRecord := jobs.Record{
Description: fmt.Sprintf("Cleanup job for '%s'", sc.job.Payload().Description),
Username: sc.job.Payload().Username,
DescriptorIDs: sqlbase.IDs{scDesc.GetID()},
Details: jobspb.SchemaChangeDetails{ResumeSpanList: spanList},
Progress: jobspb.SchemaChangeProgress{},
}
job := sc.jobRegistry.NewJob(jobRecord)
if err := job.Created(ctx); err != nil {
return err
// Only start a job if spanList has any spans. If len(spanList) == 0, then
// no mutations were enqueued by the primary key change.
if len(spanList) > 0 {
jobRecord := jobs.Record{
Description: fmt.Sprintf("CLEANUP JOB for '%s'", sc.job.Payload().Description),
Username: sc.job.Payload().Username,
DescriptorIDs: sqlbase.IDs{scDesc.GetID()},
Details: jobspb.SchemaChangeDetails{ResumeSpanList: spanList},
Progress: jobspb.SchemaChangeProgress{},
NonCancelable: true,
}
job := sc.jobRegistry.NewJob(jobRecord)
if err := job.Created(ctx); err != nil {
return err
}
scDesc.MutationJobs = append(scDesc.MutationJobs, sqlbase.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *job.ID(),
})
}
scDesc.MutationJobs = append(scDesc.MutationJobs, sqlbase.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *job.ID(),
})
}
i++
}
Expand Down
175 changes: 97 additions & 78 deletions pkg/storage/txn_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"reflect"
"regexp"
"sync/atomic"
Expand Down Expand Up @@ -582,85 +583,103 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "txnRecordExists", func(t *testing.T, txnRecordExists bool) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
// Test with the pusher txn record below the pusher's expected epoch, at
// the pusher's expected epoch, and above the pusher's expected epoch.
// Regardless of which epoch the transaction record is written at, if
// it is marked as ABORTED, it should terminate the push.
pushEpoch := enginepb.TxnEpoch(2)
for _, c := range []struct {
name string
recordEpoch enginepb.TxnEpoch
}{
{"below", pushEpoch - 1},
{"equal", pushEpoch},
{"above", pushEpoch + 1},
} {
t.Run(fmt.Sprintf("recordEpoch=%s", c.name), func(t *testing.T) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}
pusher.Epoch = pushEpoch

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Epoch = c.recordEpoch
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

Expand Down

0 comments on commit aeb41dc

Please sign in to comment.