Skip to content

Commit

Permalink
Merge #31530 #31682
Browse files Browse the repository at this point in the history
31530: sql: use schema change lease only for long running schema changes r=vivekmenezes a=vivekmenezes

Schema changes like RENAME TABLE that only change the schema
and run no data backfills need not need to hold the schema change
lease. The schema change post processing normally happens on
the gateway node. If the gateway fails, the post processing will
be picked up by another node. If more than one node picks it up,
they can all execute the post processing because it is idempotent.

fixes #18983

Release note: None

31682: roachprod-stress: request traceback from all goroutines r=benesch a=benesch

This makes debugging much easier.

Release note: None

Co-authored-by: Vivek Menezes <vivek@cockroachlabs.com>
Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
3 people committed Oct 22, 2018
3 parents c549d9a + 2ea09f0 + 5e3975b commit 9f994ab
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 88 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachprod-stress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func run() error {
var stderr bytes.Buffer
cmd := exec.Command("roachprod",
"ssh", fmt.Sprintf("%s:%d", cluster, i), "--",
fmt.Sprintf("./stress %s", strings.Join(os.Args[2:], " ")))
fmt.Sprintf("GOTRACEBACK=all ./stress %s", strings.Join(os.Args[2:], " ")))
cmd.Stdout = stdoutW
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
Expand Down
191 changes: 104 additions & 87 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,59 +471,77 @@ func (sc *SchemaChanger) truncateTable(
return nil
}

// maybe Add/Drop a table depending on the state of a table descriptor.
// This method returns true if the table is deleted.
func (sc *SchemaChanger) maybeAddDrop(
ctx context.Context,
inSession bool,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
table *sqlbase.TableDescriptor,
evalCtx *extendedEvalContext,
) (bool, error) {
if table.Dropped() {
if err := sc.ExtendLease(ctx, lease); err != nil {
return false, err
}
// maybe Drop a table. Return nil if successfully dropped.
func (sc *SchemaChanger) maybeDropTable(
ctx context.Context, inSession bool, table *sqlbase.TableDescriptor, evalCtx *extendedEvalContext,
) error {
if !table.Dropped() || inSession {
return nil
}

if inSession {
return false, nil
// This can happen if a change other than the drop originally
// scheduled the changer for this table. If that's the case,
// we still need to wait for the deadline to expire.
if table.DropTime != 0 {
var timeRemaining time.Duration
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
timeRemaining = 0
_, zoneCfg, _, err := GetZoneConfigInTxn(ctx, txn, uint32(table.ID),
&sqlbase.IndexDescriptor{}, "", false /* getInheritedDefault */)
if err != nil {
return err
}
deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds()
timeRemaining = timeutil.Since(timeutil.Unix(0, deadline))
return nil
}); err != nil {
return err
}
if timeRemaining < 0 {
return errNotHitGCTTLDeadline
}
}

// This can happen if a change other than the drop originally
// scheduled the changer for this table. If that's the case,
// we still need to wait for the deadline to expire.
if table.DropTime != 0 {
var timeRemaining time.Duration
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
timeRemaining = 0
_, zoneCfg, _, err := GetZoneConfigInTxn(ctx, txn, uint32(table.ID),
&sqlbase.IndexDescriptor{}, "", false /* getInheritedDefault */)
if err != nil {
return err
}
deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds()
timeRemaining = timeutil.Since(timeutil.Unix(0, deadline))
return nil
}); err != nil {
return false, err
}
if timeRemaining < 0 {
return false, errNotHitGCTTLDeadline
}
// Acquire lease.
lease, err := sc.AcquireLease(ctx)
if err != nil {
return err
}
needRelease := true
// Always try to release lease.
defer func() {
// If the schema changer deleted the descriptor, there's no longer a lease to be
// released.
if !needRelease {
return
}
// Do all the hard work of deleting the table data and the table ID.
if err := sc.truncateTable(ctx, lease, table, evalCtx); err != nil {
return false, err
if err := sc.ReleaseLease(ctx, lease); err != nil {
log.Warning(ctx, err)
}
}()

return true, sc.DropTableDesc(ctx, table, false /* traceKV */)
// Do all the hard work of deleting the table data and the table ID.
if err := sc.truncateTable(ctx, &lease, table, evalCtx); err != nil {
return err
}

if err := sc.DropTableDesc(ctx, table, false /* traceKV */); err != nil {
return err
}
// The descriptor was deleted.
needRelease = false
return nil
}

// maybe make a table PUBLIC if it's in the ADD state.
func (sc *SchemaChanger) maybeMakeAddTablePublic(
ctx context.Context, table *sqlbase.TableDescriptor,
) error {
if table.Adding() {
for _, idx := range table.AllNonDropIndexes() {
if idx.ForeignKey.IsSet() {
if err := sc.waitToUpdateLeases(ctx, idx.ForeignKey.Table); err != nil {
return false, err
return err
}
}
}
Expand All @@ -532,23 +550,23 @@ func (sc *SchemaChanger) maybeAddDrop(
ctx,
table.ID,
func(tbl *sqlbase.TableDescriptor) error {
if !tbl.Adding() {
return errDidntUpdateDescriptor
}
tbl.State = sqlbase.TableDescriptor_PUBLIC
return nil
},
func(txn *client.Txn) error { return nil },
); err != nil {
return false, err
return err
}
}

return false, nil
return nil
}

func (sc *SchemaChanger) maybeGCMutations(
ctx context.Context,
inSession bool,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
table *sqlbase.TableDescriptor,
ctx context.Context, inSession bool, table *sqlbase.TableDescriptor,
) error {
if inSession || len(table.GCMutations) == 0 || len(sc.dropIndexTimes) == 0 {
return nil
Expand All @@ -559,10 +577,6 @@ func (sc *SchemaChanger) maybeGCMutations(
return nil
}

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

// Find dropped index with earliest GC deadline.
dropped := sc.dropIndexTimes[0]
for i := 1; i < len(sc.dropIndexTimes); i++ {
Expand Down Expand Up @@ -593,11 +607,23 @@ func (sc *SchemaChanger) maybeGCMutations(
return nil
}

if err := sc.truncateIndexes(ctx, lease, table.Version, []sqlbase.IndexDescriptor{{ID: mutation.IndexID}}); err != nil {
// Acquire lease.
lease, err := sc.AcquireLease(ctx)
if err != nil {
return err
}
// Always try to release lease.
defer func() {
if err := sc.ReleaseLease(ctx, lease); err != nil {
log.Warning(ctx, err)
}
}()

if err := sc.truncateIndexes(ctx, &lease, table.Version, []sqlbase.IndexDescriptor{{ID: mutation.IndexID}}); err != nil {
return err
}

_, err := sc.leaseMgr.Publish(
_, err = sc.leaseMgr.Publish(
ctx,
table.ID,
func(tbl *sqlbase.TableDescriptor) error {
Expand Down Expand Up @@ -681,13 +707,7 @@ func (sc *SchemaChanger) updateDropTableJob(
}

// Drain old names from the cluster.
func (sc *SchemaChanger) drainNames(
ctx context.Context, lease *sqlbase.TableDescriptor_SchemaChangeLease,
) error {
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

func (sc *SchemaChanger) drainNames(ctx context.Context) error {
// Publish a new version with all the names drained after everyone
// has seen the version with the new name. All the draining names
// can be reused henceforth.
Expand Down Expand Up @@ -740,23 +760,6 @@ func (sc *SchemaChanger) exec(
log.Infof(ctx, "exec pending schema change; table: %d, mutation: %d",
sc.tableID, sc.mutationID)
}
// Acquire lease.
lease, err := sc.AcquireLease(ctx)
if err != nil {
return err
}
needRelease := true
// Always try to release lease.
defer func() {
// If the schema changer deleted the descriptor, there's no longer a lease to be
// released.
if !needRelease {
return
}
if err := sc.ReleaseLease(ctx, lease); err != nil {
log.Warning(ctx, err)
}
}()

tableDesc, notFirst, err := sc.notFirstInLine(ctx)
if err != nil {
Expand All @@ -767,36 +770,51 @@ func (sc *SchemaChanger) exec(
}

if tableDesc.HasDrainingNames() {
if err := sc.drainNames(ctx, &lease); err != nil {
if err := sc.drainNames(ctx); err != nil {
return err
}
}

if err := sc.maybeGCMutations(ctx, inSession, &lease, tableDesc); err != nil {
// Delete dropped table data if possible.
if err := sc.maybeDropTable(ctx, inSession, tableDesc, evalCtx); err != nil {
return err
}

if drop, err := sc.maybeAddDrop(ctx, inSession, &lease, tableDesc, evalCtx); err != nil {
if err := sc.maybeMakeAddTablePublic(ctx, tableDesc); err != nil {
return err
}

if err := sc.maybeGCMutations(ctx, inSession, tableDesc); err != nil {
return err
} else if drop {
needRelease = false
return nil
}

// Wait for the schema change to propagate to all nodes after this function
// returns, so that the new schema is live everywhere. This is not needed for
// correctness but is done to make the UI experience/tests predictable.
defer func() {
waitToUpdateLeases := func() {
if err := sc.waitToUpdateLeases(ctx, sc.tableID); err != nil {
log.Warning(ctx, err)
}
}()
}

if sc.mutationID == sqlbase.InvalidMutationID {
// Nothing more to do.
waitToUpdateLeases()
return nil
}

// Acquire lease.
lease, err := sc.AcquireLease(ctx)
if err != nil {
return err
}
// Always try to release lease.
defer func() {
if err := sc.ReleaseLease(ctx, lease); err != nil {
log.Warning(ctx, err)
}
}()

// Find our job.
foundJobID := false
for _, g := range tableDesc.MutationJobs {
Expand All @@ -823,8 +841,7 @@ func (sc *SchemaChanger) exec(
}
}

// Another transaction might set the up_version bit again,
// but we're no longer responsible for taking care of that.
defer waitToUpdateLeases()

// Run through mutation state machine and backfill.
err = sc.runStateMachineAndBackfill(ctx, &lease, evalCtx)
Expand Down
69 changes: 69 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3724,3 +3724,72 @@ func TestSchemaChangeGRPCError(t *testing.T) {
t.Fatal(err)
}
}

// TestBlockedSchemaChange tests whether a schema change that
// has no data backfill processing will be blocked by a schema
// change that is holding the schema change lease while backfill
// processing.
func TestBlockedSchemaChange(t *testing.T) {
defer leaktest.AfterTest(t)()

const maxValue = 100
notifyBackfill := make(chan struct{})
tableRenameDone := make(chan struct{})

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeBackfill: func() error {
if notify := notifyBackfill; notify != nil {
notifyBackfill = nil
close(notify)
<-tableRenameDone
}
return nil
},
},
}
s, db, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
`)

// Bulk insert.
if err := bulkInsertIntoTable(db, maxValue); err != nil {
t.Fatal(err)
}

ctx := context.TODO()
if err := checkTableKeyCount(ctx, kvDB, 1, maxValue); err != nil {
t.Fatal(err)
}

notification := notifyBackfill

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if _, err := db.Exec(`CREATE INDEX foo ON t.public.test (v)`); err != nil {
t.Error(err)
}
}()

<-notification

if _, err := db.Exec(`ALTER TABLE t.test RENAME TO t.newtest`); err != nil {
t.Fatal(err)
}

close(tableRenameDone)

if _, err := db.Query(`SELECT x from t.test`); !testutils.IsError(err, `relation "t.test" does not exist`) {
t.Fatalf("err = %+v", err)
}

wg.Wait()
}

0 comments on commit 9f994ab

Please sign in to comment.