Skip to content

Commit

Permalink
xtrabackup: Make sure all files are closed before writing MANIFEST. (#…
Browse files Browse the repository at this point in the history
…5177)

* Update to latest GCS client.

Signed-off-by: Anthony Yeh <enisoc@planetscale.com>

* xtrabackup: Make sure all files are closed before writing MANIFEST.

We've observed a backup that was missing files, yet had a MANIFEST.
In the built-in backup engine, the contract was that the MANIFEST file
must not be written unless all files were confirmed to have been
uploaded successfully. In XtraBackup mode, we were not meeting this
contract because an error that occurred while closing a file would not
be noticed until after we had written the MANIFEST.

Signed-off-by: Anthony Yeh <enisoc@planetscale.com>
  • Loading branch information
enisoc authored Sep 10, 2019
1 parent 3b7f327 commit d53de4f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 60 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module vitess.io/vitess
go 1.12

require (
cloud.google.com/go v0.43.0
cloud.google.com/go v0.45.1
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v0.0.0-20180223184012-ebef4262e06a
github.com/boltdb/bolt v1.3.1 // indirect
Expand All @@ -22,6 +22,7 @@ require (
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/google/btree v1.0.0 // indirect
github.com/gorilla/websocket v0.0.0-20160912153041-2d1e4548da23
github.com/grpc-ecosystem/go-grpc-middleware v0.0.0-20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180418170936-39de4380c2e0
Expand Down Expand Up @@ -67,8 +68,7 @@ require (
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools v0.0.0-20190830154057-c17b040389b9
google.golang.org/api v0.7.0
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect
google.golang.org/api v0.9.0
google.golang.org/grpc v1.21.1
gopkg.in/asn1-ber.v1 v1.0.0-20150924051756-4e86f4367175 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.43.0 h1:banaiRPAM8kUVYneOSkhgcDsLzEvL25FinuiSZaH/2w=
cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
cloud.google.com/go v0.45.1 h1:lRi0CHyU+ytlvylOlFKKq0af6JncuyoRh1J+QJBqQx0=
cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
Expand Down Expand Up @@ -280,6 +286,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0 h1:9sdfJOzWlkqPltHAuzT2Cp+yrBeY1KRVYgms8soxMwM=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.9.0 h1:jbyannxz0XFD3zdjgrSUsaJbgpH4eTrkdhRChkHPfO8=
google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand All @@ -293,6 +302,8 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190716160619-c506a9f90610/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 h1:iKtrH9Y8mcbADOP0YFaEMth7OfuHY9xHOwNj4znpM1A=
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8=
Expand Down
133 changes: 76 additions & 57 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ func (be *XtrabackupEngine) backupFileName() string {
return fileName
}

func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalErr *error) {
if closeErr := wc.Close(); *finalErr == nil {
*finalErr = closeErr
} else if closeErr != nil {
// since we already have an error just log this
logger.Errorf("error closing file %v: %v", fileName, closeErr)
}
}

// ExecuteBackup returns a boolean that indicates if the backup is usable,
// and an overall error.
func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (complete bool, finalErr error) {
Expand All @@ -128,6 +137,56 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
flavor := pos.GTIDSet.Flavor()
logger.Infof("Detected MySQL flavor: %v", flavor)

backupFileName := be.backupFileName()
numStripes := int(*xtrabackupStripes)

// Perform backups in a separate function, so deferred calls to Close() are
// all done before we continue to write the MANIFEST. This ensures that we
// do not write the MANIFEST unless all files were closed successfully,
// maintaining the contract that a MANIFEST file should only exist if the
// backup was created successfully.
replicationPosition, err := be.backupFiles(ctx, cnf, logger, bh, backupFileName, numStripes, flavor)
if err != nil {
return false, err
}

// open the MANIFEST
mwc, err := bh.AddFile(ctx, backupManifestFileName, 0)
if err != nil {
return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName)
}
defer closeFile(mwc, backupManifestFileName, logger, &finalErr)

// JSON-encode and write the MANIFEST
bm := &xtraBackupManifest{
// Common base fields
BackupManifest: BackupManifest{
BackupMethod: xtrabackupEngineName,
Position: replicationPosition,
FinishedTime: time.Now().UTC().Format(time.RFC3339),
},

// XtraBackup-specific fields
FileName: backupFileName,
StreamMode: *xtrabackupStreamMode,
SkipCompress: !*backupStorageCompress,
Params: *xtrabackupBackupFlags,
NumStripes: int32(numStripes),
StripeBlockSize: int32(*xtrabackupStripeBlockSize),
}

data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName)
}
if _, err := mwc.Write([]byte(data)); err != nil {
return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName)
}

return true, nil
}

func (be *XtrabackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, logger logutil.Logger, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) {
backupProgram := path.Join(*xtrabackupEnginePath, xtrabackupBinaryName)

flagsToExec := []string{"--defaults-file=" + cnf.path,
Expand All @@ -140,40 +199,32 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
if *xtrabackupStreamMode != "" {
flagsToExec = append(flagsToExec, "--stream="+*xtrabackupStreamMode)
}

if *xtrabackupBackupFlags != "" {
flagsToExec = append(flagsToExec, strings.Fields(*xtrabackupBackupFlags)...)
}

backupFileName := be.backupFileName()
numStripes := int(*xtrabackupStripes)

destFiles, err := addStripeFiles(ctx, bh, backupFileName, numStripes, logger)
if err != nil {
return false, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName)
}
closeFile := func(wc io.WriteCloser, fileName string) {
if closeErr := wc.Close(); finalErr == nil {
finalErr = closeErr
} else if closeErr != nil {
// since we already have an error just log this
logger.Errorf("error closing file %v: %v", fileName, closeErr)
}
return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName)
}
defer func() {
for _, file := range destFiles {
closeFile(file, backupFileName)
filename := backupFileName
for i, file := range destFiles {
if numStripes > 1 {
filename = stripeFileName(backupFileName, i)
}
closeFile(file, filename, logger, &finalErr)
}
}()

backupCmd := exec.CommandContext(ctx, backupProgram, flagsToExec...)
backupOut, err := backupCmd.StdoutPipe()
if err != nil {
return false, vterrors.Wrap(err, "cannot create stdout pipe")
return replicationPosition, vterrors.Wrap(err, "cannot create stdout pipe")
}
backupErr, err := backupCmd.StderrPipe()
if err != nil {
return false, vterrors.Wrap(err, "cannot create stderr pipe")
return replicationPosition, vterrors.Wrap(err, "cannot create stderr pipe")
}

destWriters := []io.Writer{}
Expand All @@ -188,7 +239,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
if *backupStorageCompress {
compressor, err := pgzip.NewWriterLevel(writer, pgzip.BestSpeed)
if err != nil {
return false, vterrors.Wrap(err, "cannot create gzip compressor")
return replicationPosition, vterrors.Wrap(err, "cannot create gzip compressor")
}
compressor.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks)
writer = compressor
Expand All @@ -199,7 +250,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
}

if err = backupCmd.Start(); err != nil {
return false, vterrors.Wrap(err, "unable to start backup")
return replicationPosition, vterrors.Wrap(err, "unable to start backup")
}

// Read stderr in the background, so we can log progress as xtrabackup runs.
Expand Down Expand Up @@ -240,20 +291,20 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
blockSize = 1024
}
if _, err := copyToStripes(destWriters, backupOut, blockSize); err != nil {
return false, vterrors.Wrap(err, "cannot copy output from xtrabackup command")
return replicationPosition, vterrors.Wrap(err, "cannot copy output from xtrabackup command")
}

// Close compressor to flush it. After that all data is sent to the buffer.
for _, compressor := range destCompressors {
if err := compressor.Close(); err != nil {
return false, vterrors.Wrap(err, "cannot close gzip compressor")
return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor")
}
}

// Flush the buffer to finish writing on destination.
for _, buffer := range destBuffers {
if err = buffer.Flush(); err != nil {
return false, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName)
return replicationPosition, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName)
}
}

Expand All @@ -263,47 +314,15 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql
sterrOutput := stderrBuilder.String()

if err := backupCmd.Wait(); err != nil {
return false, vterrors.Wrap(err, "xtrabackup failed with error")
return replicationPosition, vterrors.Wrap(err, "xtrabackup failed with error")
}

replicationPosition, rerr := findReplicationPosition(sterrOutput, flavor, logger)
if rerr != nil {
return false, vterrors.Wrap(rerr, "backup failed trying to find replication position")
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
}
// open the MANIFEST
mwc, err := bh.AddFile(ctx, backupManifestFileName, 0)
if err != nil {
return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName)
}
defer closeFile(mwc, backupManifestFileName)

// JSON-encode and write the MANIFEST
bm := &xtraBackupManifest{
// Common base fields
BackupManifest: BackupManifest{
BackupMethod: xtrabackupEngineName,
Position: replicationPosition,
FinishedTime: time.Now().UTC().Format(time.RFC3339),
},

// XtraBackup-specific fields
FileName: backupFileName,
StreamMode: *xtrabackupStreamMode,
SkipCompress: !*backupStorageCompress,
Params: *xtrabackupBackupFlags,
NumStripes: int32(numStripes),
StripeBlockSize: int32(*xtrabackupStripeBlockSize),
}

data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName)
}
if _, err := mwc.Write([]byte(data)); err != nil {
return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName)
}

return true, nil
return replicationPosition, nil
}

// ExecuteRestore restores from a backup. Any error is returned.
Expand Down

0 comments on commit d53de4f

Please sign in to comment.