Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/schemachanger: prevent concurrent type desc changes #113639

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,13 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
return true
}
// Check if any enum members are transitioning, which should
// block declarative jobs.
for _, member := range desc.EnumMembers {
if member.Direction != descpb.TypeDescriptor_EnumMember_NONE {
return true
}
}
// TODO(fqazi): In the future we may not have concurrent declarative schema
// changes without a job ID. So, we should scan the elements involved for
// types.
Expand Down
65 changes: 65 additions & 0 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,67 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) {
"defaultdb", "t", expectedKeyCount)
}

typeSchemaChangeFunc := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) {
ctx, cancel := context.WithCancel(context.Background())
addTypeStartedChan := make(chan struct{})
resumeAddTypeJobChan := make(chan struct{})
var closeAddTypeValueChanOnce sync.Once
schemaChangeWaitCompletedChan := make(chan struct{})

var params base.TestServerArgs
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
// If the blocked schema changer is from legacy schema changer, we let
// it hijack this knob (which is originally design for declarative
// schema changer) if `stmt` is nil.
WhileWaitingForConcurrentSchemaChanges: func(stmts []string) {
if (len(stmts) == 1 && strings.Contains(stmts[0], "DROP TYPE")) ||
stmts == nil {
closeAddTypeValueChanOnce.Do(func() {
close(resumeAddTypeJobChan)
close(schemaChangeWaitCompletedChan)
})
}
},
},
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
// Prevent the GC job from running so we ensure that all the keys which
// were written remain.
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error {
<-ctx.Done()
return ctx.Err()
}},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
defer cancel()
tdb := sqlutils.MakeSQLRunner(sqlDB)

tdb.Exec(t, "CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');")

// Execute 1st DDL asynchronously and block until it's executing.
tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String())
go func() {
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints="typeschemachanger.before.exec"`)
tdb.ExpectErr(t,
".*was paused before it completed with reason: pause point \"typeschemachanger.before.exec\" hit",
`ALTER TYPE status ADD VALUE 'unknown';`)
close(addTypeStartedChan)
<-resumeAddTypeJobChan // wait for DROP TYPE to unblock me
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints=""`)
tdb.Exec(t, "RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE status='paused' FETCH FIRST 1 ROWS ONLY);\n")
}()
<-addTypeStartedChan

// Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so
// it will eventually be able to proceed after waiting for a while.
tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String())
tdb.Exec(t, `DROP TYPE STATUS;`)
// After completion make sure we actually waited for schema changes.
<-schemaChangeWaitCompletedChan
}

t.Run("declarative-then-declarative", func(t *testing.T) {
tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
})
Expand All @@ -502,6 +563,10 @@ func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) {
tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
})

t.Run("typedesc legacy-then-declarative", func(t *testing.T) {
typeSchemaChangeFunc(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not run this in the other direction as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Give this a unique t.Run name

})

// legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges
// because the waiting occurred under a different code path.
}
Expand Down