Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point in time recovery: fix cross-tablet GTID evaluation #13555

Merged
merged 16 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtctldclient/command/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func init() {
ApplySchema.Flags().StringVar(&applySchemaOptions.CallerID, "caller-id", "", "Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used).")
ApplySchema.Flags().StringArrayVar(&applySchemaOptions.SQL, "sql", nil, "Semicolon-delimited, repeatable SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().StringVar(&applySchemaOptions.SQLFile, "sql-file", "", "Path to a file containing semicolon-delimited SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicabel when all queries are CREATE TABLE|VIEW")
ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicable when all queries are CREATE TABLE|VIEW")

Root.AddCommand(ApplySchema)

Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,15 @@ func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
}
backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase)
}

// TestIncrementalBackupOnTwoTablets runs a series of interleaved backups on two different replicas: full and incremental.
// Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on
// one another.
func TestIncrementalBackupOnTwoTablets(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
SetupType: backup.BuiltinBackup,
ComprssDetails: nil,
}
backup.ExecTestIncrementalBackupOnTwoTablets(t, tcase)
}
153 changes: 109 additions & 44 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ var (
}

vtInsertTest = `
create table vt_insert_test (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB`
create table vt_insert_test (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB
`
)

type CompressionDetails struct {
Expand Down Expand Up @@ -163,11 +164,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...)

var mysqlProcs []*exec.Cmd
tabletTypes := map[int]string{
0: "primary",
1: "replica",
2: "rdonly",
}
for i := 0; i < 3; i++ {
tabletType := "replica"
if i == 0 {
tabletType = "primary"
}
tabletType := tabletTypes[i]
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
Expand Down Expand Up @@ -220,13 +223,16 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory)
_, err = vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
if err != nil {
return 1, err
}

for _, tablet := range []cluster.Vttablet{*primary, *replica1} {
for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} {
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
Expand Down Expand Up @@ -1069,40 +1075,30 @@ func terminateRestore(t *testing.T) {
assert.True(t, found, "Restore message not found")
}

func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backups []string, destroy func(t *testing.T)) {
restoreWaitForBackup(t, tabletType, nil, true)
verifyInitialReplication(t)

err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

backups = localCluster.VerifyBackupCount(t, shardKsName, 1)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())
func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) {
replica := getReplica(t, replicaIndex)

err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second)
err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias)
require.Nil(t, err)

err = replica2.VttabletProcess.TearDown()
backups, err = localCluster.ListBackups(shardKsName)
require.Nil(t, err)

err = localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", replica2.Alias)
require.Nil(t, err)
verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())

destroy = func(t *testing.T) {
verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups)
}
return backups, destroy
return backups
}

func GetReplicaPosition(t *testing.T) string {
pos, _ := cluster.GetPrimaryPosition(t, *replica1, hostname)
func GetReplicaPosition(t *testing.T, replicaIndex int) string {
replica := getReplica(t, replicaIndex)
pos, _ := cluster.GetPrimaryPosition(t, *replica, hostname)
return pos
}

func GetReplicaGtidPurged(t *testing.T) string {
func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string {
replica := getReplica(t, replicaIndex)
query := "select @@global.gtid_purged as gtid_purged"
rs, err := replica1.VttabletProcess.QueryTablet(query, keyspaceName, true)
rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
Expand Down Expand Up @@ -1135,13 +1131,62 @@ func ReadRowsFromPrimary(t *testing.T) (msgs []string) {
return ReadRowsFromTablet(t, primary)
}

func ReadRowsFromReplica(t *testing.T) (msgs []string) {
return ReadRowsFromTablet(t, replica1)
func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet {
switch replicaIndex {
case 0:
return replica1
case 1:
return replica2
default:
assert.Failf(t, "invalid replica index", "index=%d", replicaIndex)
return nil
}
}

func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) {
return ReadRowsFromTablet(t, getReplica(t, replicaIndex))
}

// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` <count> times
func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) {
replica := getReplica(t, replicaIndex)
query := "flush binary logs"
for i := 0; i < count; i++ {
_, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
}
}

// FlushAndPurgeBinaryLogsOnReplica intentionally loses all existing binary logs. It flushes into a new binary log
// and immediately purges all previous logs.
// This is used to lose information.
func FlushAndPurgeBinaryLogsOnReplica(t *testing.T, replicaIndex int) (lastBinlog string) {
FlushBinaryLogsOnReplica(t, replicaIndex, 1)

replica := getReplica(t, replicaIndex)
{
query := "show binary logs"
rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
require.NotEmpty(t, rs.Rows)
for _, row := range rs.Rows {
// binlog file name is first column
lastBinlog = row[0].ToString()
}
}
{
query, err := sqlparser.ParseAndBind("purge binary logs to %a", sqltypes.StringBindVariable(lastBinlog))
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
}
return lastBinlog
}

func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.BackupManifest) {
// reading manifest
data, err := os.ReadFile(backupLocation + "/MANIFEST")
fullPath := backupLocation + "/MANIFEST"
data, err := os.ReadFile(fullPath)
require.NoErrorf(t, err, "error while reading MANIFEST %v", err)

// parsing manifest
Expand All @@ -1151,19 +1196,19 @@ func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.B
return manifest
}

func TestReplicaFullBackup(t *testing.T) (manifest *mysqlctl.BackupManifest, destroy func(t *testing.T)) {
backups, destroy := vtctlBackupReplicaNoDestroyNoWrites(t, "replica")
func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.BackupManifest) {
backups := vtctlBackupReplicaNoDestroyNoWrites(t, replicaIndex)

backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backups[len(backups)-1]
return readManifestFile(t, backupLocation), destroy
return readManifestFile(t, backupLocation)
}

func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
}
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica1.Alias)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
Expand All @@ -1173,23 +1218,43 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.P

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())
verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
return readManifestFile(t, backupLocation), backupName
}

func TestReplicaRestoreToPos(t *testing.T, restoreToPos replication.Position, expectError string) {
func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
replica := getReplica(t, replicaIndex)
return testReplicaIncrementalBackup(t, replica, incrementalFromPos, expectError)
}

func TestReplicaFullRestore(t *testing.T, replicaIndex int, expectError string) {
replica := getReplica(t, replicaIndex)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
}

func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replication.Position, expectError string) {
replica := getReplica(t, replicaIndex)

require.False(t, restoreToPos.IsZero())
restoreToPosArg := replication.EncodePosition(restoreToPos)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--restore-to-pos", restoreToPosArg, replica1.Alias)
output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_pos", restoreToPosArg, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
Expand Down
Loading
Loading