Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrademanager: fix upgrade manager tests that relied on wrong invariants #100830

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 51 additions & 31 deletions pkg/upgrade/upgrademanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,34 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

ch := make(chan chan error)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
BinaryVersionOverride: clusterversion.ByKey(startCV),
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
DistSQL: &execinfra.TestingKnobs{
// See the TODO below for why we need this.
ProcessorNoTracingSpan: true,
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) {
if v != endCV {
if v != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", v, upgrade.NoPrecondition, func(
Expand All @@ -107,13 +109,26 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)

// At this point the test cluster has run all the migrations until startCV.

upgrade1Err := make(chan error, 1)
go func() {
_, err := tc.ServerConn(0).ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
upgrade1Err <- err
}()
unblock := <-ch

var firstID jobspb.JobID
var firstPayload, firstProgress []byte
require.NoError(t, tc.ServerConn(0).QueryRow(`
SELECT id, payload, progress FROM crdb_internal.system_jobs WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL AND status = 'running'
`).Scan(&firstID, &firstPayload, &firstProgress))

// Inject a second job for the same upgrade and ensure that that causes
// an error. This is pretty gnarly.
var secondID jobspb.JobID
Expand All @@ -125,21 +140,25 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
unique_rowid(),
status,
created,
payload,
progress,
NULL,
NULL,
created_by_type,
created_by_id,
claim_session_id,
claim_instance_id
FROM system.jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL
FROM crdb_internal.system_jobs
WHERE id = $1
)
RETURNING id;`).Scan(&secondID))
RETURNING id;`, firstID).Scan(&secondID))
// Insert the job payload and progress into the `system.job_info` table.
err := tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, secondID)
if err := infoStorage.WriteLegacyPayload(ctx, firstPayload); err != nil {
return err
}
return infoStorage.WriteLegacyProgress(ctx, firstProgress)
})
require.NoError(t, err)

// Make sure that the second job gets run in a timely manner.
runErr := make(chan error)
Expand All @@ -150,7 +169,7 @@ RETURNING id;`).Scan(&secondID))
fakeJobBlockChan := <-ch

// Ensure that we see the assertion error.
_, err := tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
_, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
require.Regexp(t, "found multiple non-terminal jobs for version", err)

// Let the fake, erroneous job finish with an error.
Expand Down Expand Up @@ -392,9 +411,13 @@ func TestPauseMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

type migrationEvent struct {
unblock chan<- error
Expand All @@ -405,19 +428,16 @@ func TestPauseMigration(t *testing.T) {
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BinaryVersionOverride: clusterversion.ByKey(startCV),
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(cv roachpb.Version) (upgradebase.Upgrade, bool) {
if cv != endCV {
if cv != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", cv, upgrade.NoPrecondition, func(
Expand Down Expand Up @@ -454,13 +474,13 @@ func TestPauseMigration(t *testing.T) {
var id int64
tdb.QueryRow(t, `
SELECT id
FROM system.jobs
FROM crdb_internal.system_jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL;`).
) IS NOT NULL AND status = 'running';`).
Scan(&id)
tdb.Exec(t, "PAUSE JOB $1", id)

Expand Down