Skip to content

Commit

Permalink
importccl, backupccl: add import and restore tenant tests with multip…
Browse files Browse the repository at this point in the history
…le SQL pods

These tests validate that imports and restores can be
distributed across multiple SQL pods in a single tenant.

Release note: None
  • Loading branch information
rharding6373 committed Feb 25, 2022
1 parent 677a7d5 commit 6b556c9
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
for range bp.progCh {
}
}
log.Infof(ctx, "starting backup data")
if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
Expand Down
142 changes: 139 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ import (
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/proto"
pgx "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4"
"github.com/kr/pretty"
"github.com/lib/pq"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -7136,14 +7136,13 @@ INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510,
func TestBackupRestoreInsideTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 1

makeTenant := func(srv serverutils.TestServerInterface, tenant uint64) (*sqlutils.SQLRunner, func()) {
_, conn := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(tenant)})
cleanup := func() { conn.Close() }
return sqlutils.MakeSQLRunner(conn), cleanup
}

const numAccounts = 1
tc, systemDB, dir, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
_, _ = tc, systemDB
defer cleanupFn()
Expand Down Expand Up @@ -7250,6 +7249,143 @@ func TestBackupRestoreInsideTenant(t *testing.T) {
tenant10.Exec(t, `RESTORE data.bank FROM $1`, httpAddr)
systemDB.CheckQueryResults(t, `SELECT * FROM data.bank`, tenant10.QueryStr(t, `SELECT * FROM data.bank`))
})

})
}

// TestBackupRestoreInsideMultiPodTenant verifies that backup and restore work inside
// tenants with multiple SQL pods. Currently, verification
// that restore is distributed to all pods in the multi-pod tests must be done
// manually and by enabling logging and checking the log for messages containing
// "starting restore data" for nsql1 and nsql2.
// TODO(harding): Verify that backup and restore are distributed in test.
func TestBackupRestoreInsideMultiPodTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "may time out due to multiple servers")

const numAccounts = 1
const npods = 2

makeTenant := func(srv serverutils.TestServerInterface, tenant uint64, existing bool) (*sqlutils.SQLRunner, func()) {
_, conn := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(tenant), Existing: existing})
cleanup := func() { conn.Close() }
return sqlutils.MakeSQLRunner(conn), cleanup
}

tc, systemDB, dir, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
_, _ = tc, systemDB
defer cleanupFn()
srv := tc.Server(0)

// NB: tenant certs for 10, 11, and 20 are embedded. See:
_ = security.EmbeddedTenantIDs()

// Create another server.
tc2, systemDB2, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, dir, InitManualReplication, base.TestClusterArgs{})
srv2 := tc2.Server(0)
defer cleanupEmptyCluster()

tenant10 := make([]*sqlutils.SQLRunner, npods)
cleanupT10 := make([]func(), npods)
tenant11 := make([]*sqlutils.SQLRunner, npods)
cleanupT11 := make([]func(), npods)
tenant10C2 := make([]*sqlutils.SQLRunner, npods)
cleanupT10C2 := make([]func(), npods)
for i := 0; i < npods; i++ {
tenant10[i], cleanupT10[i] = makeTenant(srv, 10, i != 0)
defer cleanupT10[i]()
tenant11[i], cleanupT11[i] = makeTenant(srv, 11, i != 0)
defer cleanupT11[i]()
tenant10C2[i], cleanupT10C2[i] = makeTenant(srv2, 10, i != 0)
defer cleanupT10C2[i]()
}

tenant11C2, cleanupT11C2 := makeTenant(srv2, 11, false)
defer cleanupT11C2()

tenant10[0].Exec(t, `CREATE DATABASE foo; CREATE TABLE foo.bar(i int primary key); INSERT INTO foo.bar VALUES (110), (210), (310)`)

t.Run("tenant-backup", func(t *testing.T) {
// This test uses this mock HTTP server to pass the backup files between tenants.
httpAddr, httpServerCleanup := makeInsecureHTTPServer(t)
defer httpServerCleanup()

tenant10[0].Exec(t, `BACKUP TO $1`, httpAddr)

t.Run("cluster-restore", func(t *testing.T) {
t.Run("into-same-tenant-id", func(t *testing.T) {
tenant10C2[0].Exec(t, `RESTORE FROM $1`, httpAddr)
tenant10C2[0].CheckQueryResults(t, `SELECT * FROM foo.bar`, tenant10[0].QueryStr(t, `SELECT * FROM foo.bar`))
})
t.Run("into-different-tenant-id", func(t *testing.T) {
tenant11C2.ExpectErr(t, `cannot cluster RESTORE backups taken from different tenant: 10`,
`RESTORE FROM $1`, httpAddr)
})
t.Run("into-system-tenant-id", func(t *testing.T) {
systemDB2.ExpectErr(t, `cannot cluster RESTORE backups taken from different tenant: 10`,
`RESTORE FROM $1`, httpAddr)
})
})

t.Run("database-restore", func(t *testing.T) {
t.Run("into-same-tenant-id", func(t *testing.T) {
tenant10[0].Exec(t, `CREATE DATABASE foo2`)
tenant10[0].Exec(t, `RESTORE foo.bar FROM $1 WITH into_db='foo2'`, httpAddr)
tenant10[0].CheckQueryResults(t, `SELECT * FROM foo2.bar`, tenant10[0].QueryStr(t, `SELECT * FROM foo.bar`))
})
t.Run("into-different-tenant-id", func(t *testing.T) {
tenant11[0].Exec(t, `CREATE DATABASE foo`)
tenant11[0].Exec(t, `RESTORE foo.bar FROM $1`, httpAddr)
tenant11[0].CheckQueryResults(t, `SELECT * FROM foo.bar`, tenant10[0].QueryStr(t, `SELECT * FROM foo.bar`))
})
t.Run("into-system-tenant-id", func(t *testing.T) {
systemDB.Exec(t, `CREATE DATABASE foo2`)
systemDB.Exec(t, `RESTORE foo.bar FROM $1 WITH into_db='foo2'`, httpAddr)
systemDB.CheckQueryResults(t, `SELECT * FROM foo2.bar`, tenant10[0].QueryStr(t, `SELECT * FROM foo.bar`))
})
})
})

t.Run("system-backup", func(t *testing.T) {
// This test uses this mock HTTP server to pass the backup files between tenants.
httpAddr, httpServerCleanup := makeInsecureHTTPServer(t)
defer httpServerCleanup()

systemDB.Exec(t, `BACKUP TO $1`, httpAddr)

tenant20C2, cleanupT20C2 := makeTenant(srv2, 20, false)
defer cleanupT20C2()

t.Run("cluster-restore", func(t *testing.T) {
t.Run("with-tenant", func(t *testing.T) {
// This is disallowed because the cluster restore includes other
// tenants, which can't be restored inside a tenant.
tenant20C2.ExpectErr(t, `only the system tenant can restore other tenants`,
`RESTORE FROM $1`, httpAddr)
})

t.Run("with-no-tenant", func(t *testing.T) {
// Now restore a cluster backup taken from a system tenant that
// hasn't created any tenants.
httpAddrEmpty, cleanupEmptyHTTPServer := makeInsecureHTTPServer(t)
defer cleanupEmptyHTTPServer()

_, emptySystemDB, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode,
dir, InitManualReplication, base.TestClusterArgs{})
defer cleanupEmptyCluster()

emptySystemDB.Exec(t, `BACKUP TO $1`, httpAddrEmpty)
tenant20C2.ExpectErr(t, `cannot cluster RESTORE backups taken from different tenant: system`,
`RESTORE FROM $1`, httpAddrEmpty)
})
})

t.Run("database-restore-into-tenant", func(t *testing.T) {
tenant10[0].Exec(t, `CREATE DATABASE data`)
tenant10[0].Exec(t, `RESTORE data.bank FROM $1`, httpAddr)
systemDB.CheckQueryResults(t, `SELECT * FROM data.bank`, tenant10[0].QueryStr(t, `SELECT * FROM data.bank`))
})
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
_ = rd.phaseGroup.Wait()
}
rd.phaseGroup = ctxgroup.WithContext(ctx)
log.Infof(ctx, "starting restore data")

entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers)
rd.sstCh = make(chan mergedSST, rd.numWorkers)
Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/importerccl/ccl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,53 @@ func TestImportInTenant(t *testing.T) {
t11.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"11", "22"}, {"33", "44"}, {"55", "66"}})
}

// TestImportInMultiServerTenant tests that import is successful in a tenant
// with multiple SQL pods.
// Currently, verification that the import is distributed needs to be done
// manually, and can be done by running the test with logging enabled and
// checking that the log contains the message "starting read import" for both
// instance nsql1 and nsql 2.
func TestImportInMultiServerTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir := testutils.TestDataPath(t)
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)

// Setup a SQL server on a tenant.
_, conn1 := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
})
defer conn1.Close()
t1 := sqlutils.MakeSQLRunner(conn1)

// Setup another SQL server on the same tenant.
_, conn2 := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
Existing: true,
})
defer conn2.Close()
t2 := sqlutils.MakeSQLRunner(conn2)

const userfileURI = "userfile://defaultdb.public.root/test.csv"
const userfile2URI = "userfile://defaultdb.public.root/test2.csv"
const createStmt = "CREATE TABLE foo (k INT PRIMARY KEY, v INT)"
const importStmt = "IMPORT INTO foo CSV DATA ($1, $2)"

// Upload files.
require.NoError(t, putUserfile(ctx, conn1, security.RootUserName(), userfileURI, []byte("10,2")))
require.NoError(t, putUserfile(ctx, conn2, security.RootUserName(), userfile2URI, []byte("11,22\n33,44\n55,66")))

t1.Exec(t, createStmt)
// TODO(harding): Verify that the import is distributed to both pods.
t1.Exec(t, importStmt, userfileURI, userfile2URI)
t1.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"10", "2"}, {"11", "22"}, {"33", "44"}, {"55", "66"}})
t2.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"10", "2"}, {"11", "22"}, {"33", "44"}, {"55", "66"}})
}

func putUserfile(
ctx context.Context, conn *gosql.DB, user security.SQLUsername, uri string, content []byte,
) error {
Expand Down

0 comments on commit 6b556c9

Please sign in to comment.