Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ccl/backupccl: Remove TestBackupRestoreControlJob #99039

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ go_test(
"//pkg/cloud/gcp",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
Expand Down Expand Up @@ -256,7 +254,6 @@ go_test(
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/descbuilder",
"//pkg/sql/catalog/descidgen",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
Expand Down Expand Up @@ -318,7 +315,6 @@ go_test(
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_kr_pretty//:pretty",
Expand Down
192 changes: 0 additions & 192 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud/gcp"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
Expand All @@ -75,7 +73,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
Expand Down Expand Up @@ -114,7 +111,6 @@ import (
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/proto"
pgx "github.com/jackc/pgx/v4"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1835,194 +1831,6 @@ func TestBackupRestoreResume(t *testing.T) {
})
}

// TestBackupRestoreControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
// work as intended on backup and restore jobs.
func TestBackupRestoreControlJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 24136)

// force every call to update
defer jobs.TestingSetProgressThresholds()()

serverArgs := base.TestServerArgs{
DisableSpanConfigs: true,

Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
}
// Disable external processing of mutations so that the final check of
// crdb_internal.tables is guaranteed to not be cleaned up. Although this
// was never observed by a stress test, it is here for safety.
serverArgs.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{
// TODO (lucy): if/when this test gets reinstated, figure out what knobs are
// needed.
}

// PAUSE JOB and CANCEL JOB are racy in that it's hard to guarantee that the
// job is still running when executing a PAUSE or CANCEL--or that the job has
// even started running. To synchronize, we install a store response filter
// which does a blocking receive whenever it encounters an export or import
// response. Below, when we want to guarantee the job is in progress, we do
// exactly one blocking send. When this send completes, we know the job has
// started, as we've seen one export or import response. We also know the job
// has not finished, because we're blocking all future export and import
// responses until we close the channel, and our backup or restore is large
// enough that it will generate more than one export or import response.
var allowResponse chan struct{}
params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
}

// We need lots of ranges to see what happens when they get chunked. Rather
// than make a huge table, dial down the zone config for the bank table.
init := func(tc *testcluster.TestCluster) {
config.TestingSetupZoneConfigHook(tc.Stopper())
s := tc.Servers[0]
idgen := descidgen.NewGenerator(s.ClusterSettings(), keys.SystemSQLCodec, s.DB())
v, err := idgen.PeekNextUniqueDescID(context.Background())
if err != nil {
t.Fatal(err)
}
last := config.ObjectID(v)
zoneConfig := zonepb.DefaultZoneConfig()
zoneConfig.RangeMaxBytes = proto.Int64(5000)
config.TestingSetZoneConfig(last+1, zoneConfig)
}
const numAccounts = 1000
_, outerDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, numAccounts, init, params)
defer cleanup()

sqlDB := sqlutils.MakeSQLRunner(outerDB.DB)

t.Run("foreign", func(t *testing.T) {
foreignDir := "nodelocal://0/foreign"
sqlDB.Exec(t, `CREATE DATABASE orig_fkdb`)
sqlDB.Exec(t, `CREATE DATABASE restore_fkdb`)
sqlDB.Exec(t, `CREATE TABLE orig_fkdb.fk (i INT REFERENCES data.bank)`)
// Generate some FK data with splits so backup/restore block correctly.
for i := 0; i < 10; i++ {
sqlDB.Exec(t, `INSERT INTO orig_fkdb.fk (i) VALUES ($1)`, i)
sqlDB.Exec(t, `ALTER TABLE orig_fkdb.fk SPLIT AT VALUES ($1)`, i)
}

for i, query := range []string{
`BACKUP TABLE orig_fkdb.fk TO $1`,
`RESTORE TABLE orig_fkdb.fk FROM $1 WITH OPTIONS (skip_missing_foreign_keys, into_db='restore_fkdb')`,
} {
jobID, err := jobutils.RunJob(t, sqlDB, &allowResponse, []string{"PAUSE"}, query, foreignDir)
if !testutils.IsError(err, "job paused") {
t.Fatalf("%d: expected 'job paused' error, but got %+v", i, err)
}
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID))
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
}

sqlDB.CheckQueryResults(t,
`SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE orig_fkdb.fk`,
sqlDB.QueryStr(t, `SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE restore_fkdb.fk`),
)
})

t.Run("pause", func(t *testing.T) {
pauseDir := "nodelocal://0/pause"
noOfflineDir := "nodelocal://0/no-offline"
sqlDB.Exec(t, `CREATE DATABASE pause`)

for i, query := range []string{
`BACKUP DATABASE data TO $1`,
`RESTORE TABLE data.* FROM $1 WITH OPTIONS (into_db='pause')`,
} {
ops := []string{"PAUSE", "RESUME", "PAUSE"}
jobID, err := jobutils.RunJob(t, sqlDB, &allowResponse, ops, query, pauseDir)
if !testutils.IsError(err, "job paused") {
t.Fatalf("%d: expected 'job paused' error, but got %+v", i, err)
}
if i > 0 {
sqlDB.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE database_name = 'pause' AND state = 'OFFLINE'`,
[][]string{{"bank"}},
)
// Ensure that OFFLINE tables can be accessed to set zone configs.
sqlDB.Exec(t, `ALTER TABLE pause.bank CONFIGURE ZONE USING constraints='[+dc=dc1]'`)
// Ensure that OFFLINE tables are not included in a BACKUP.
sqlDB.ExpectErr(t, `table "pause.public.bank" does not exist`, `BACKUP pause.bank TO $1`, noOfflineDir)
sqlDB.Exec(t, `BACKUP pause.* TO $1`, noOfflineDir)
sqlDB.CheckQueryResults(t, fmt.Sprintf("SHOW BACKUP '%s'", noOfflineDir), [][]string{})
}
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID))
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
}
sqlDB.CheckQueryResults(t,
`SELECT count(*) FROM pause.bank`,
sqlDB.QueryStr(t, `SELECT count(*) FROM data.bank`),
)

sqlDB.CheckQueryResults(t,
`SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE pause.bank`,
sqlDB.QueryStr(t, `SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank`),
)
})

t.Run("pause-cancel", func(t *testing.T) {
backupDir := "nodelocal://0/backup"

backupJobID, err := jobutils.RunJob(t, sqlDB, &allowResponse, nil, "BACKUP DATABASE data TO $1", backupDir)
if err != nil {
t.Fatalf("error while running backup %+v", err)
}
jobutils.WaitForJobToSucceed(t, sqlDB, backupJobID)

sqlDB.Exec(t, `DROP DATABASE data`)

query := `RESTORE DATABASE data FROM $1`
ops := []string{"PAUSE"}
jobID, err := jobutils.RunJob(t, sqlDB, &allowResponse, ops, query, backupDir)
if !testutils.IsError(err, "job paused") {
t.Fatalf("expected 'job paused' error, but got %+v", err)
}

// Create a table while the RESTORE is in progress on the database that was
// created by the restore.
sqlDB.Exec(t, `CREATE TABLE data.new_table (a int)`)

// Do things while the job is paused.
sqlDB.Exec(t, `CANCEL JOB $1`, jobID)

// Ensure that the tables created by the user, during the RESTORE are
// still present. Also ensure that the table that was being restored (bank)
// is not.
sqlDB.Exec(t, `USE data;`)
sqlDB.CheckQueryResults(t, `SHOW TABLES;`, [][]string{{"public", "new_table", "table"}})
})

t.Run("cancel", func(t *testing.T) {
cancelDir := "nodelocal://0/cancel"
sqlDB.Exec(t, `CREATE DATABASE cancel`)

for i, query := range []string{
`BACKUP DATABASE data TO $1`,
`RESTORE TABLE data.* FROM $1 WITH OPTIONS (into_db='cancel')`,
} {
if _, err := jobutils.RunJob(
t, sqlDB, &allowResponse, []string{"cancel"}, query, cancelDir,
); !testutils.IsError(err, "job canceled") {
t.Fatalf("%d: expected 'job canceled' error, but got %+v", i, err)
}
// Check that executing the same backup or restore succeeds. This won't
// work if the first backup or restore was not successfully canceled.
sqlDB.Exec(t, query, cancelDir)
}
// Verify the canceled RESTORE added some DROP tables.
sqlDB.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE database_name = 'cancel' AND state = 'DROP'`,
[][]string{{"bank"}},
)
})
}

func TestBackupRestoreUserDefinedSchemas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down