Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #218 from tinyspeck/am_slack-vitess-9-r13-backups
Browse files Browse the repository at this point in the history
add backup status to vtctld 9.0 branch
  • Loading branch information
ajm188 authored Jun 16, 2021
2 parents 479197f + 8a31392 commit ee64609
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 9 deletions.
18 changes: 12 additions & 6 deletions go/cmd/vtctldclient/internal/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ var GetBackups = &cobra.Command{
}

var getBackupsOptions = struct {
Limit uint32
OutputJSON bool
Limit uint32
Detailed bool
DetailedLimit uint32
OutputJSON bool
}{}

func commandGetBackups(cmd *cobra.Command, args []string) error {
Expand All @@ -49,15 +51,17 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.GetBackups(commandCtx, &vtctldatapb.GetBackupsRequest{
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Detailed: getBackupsOptions.Detailed,
DetailedLimit: getBackupsOptions.DetailedLimit,
})
if err != nil {
return err
}

if getBackupsOptions.OutputJSON {
if getBackupsOptions.OutputJSON || getBackupsOptions.Detailed {
data, err := cli.MarshalJSON(resp)
if err != nil {
return err
Expand All @@ -80,5 +84,7 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
func init() {
GetBackups.Flags().Uint32VarP(&getBackupsOptions.Limit, "limit", "l", 0, "Retrieve only the most recent N backups")
GetBackups.Flags().BoolVarP(&getBackupsOptions.OutputJSON, "json", "j", false, "Output backup info in JSON format rather than a list of backups")
GetBackups.Flags().BoolVar(&getBackupsOptions.Detailed, "detailed", false, "Get detailed backup info, such as the engine used for each backup, and its status. Implies --json.")
GetBackups.Flags().Uint32Var(&getBackupsOptions.DetailedLimit, "detailed-limit", 0, "Get detailed backup info for only the most recent N backups. Ignored if --detailed is not passed.")
Root.AddCommand(GetBackups)
}
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/azblobbackupstorage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (bh *AZBlobBackupHandle) ReadFile(ctx context.Context, filename string) (io
}), nil
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *AZBlobBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
return false, nil
}

// AZBlobBackupStorage structs implements the BackupStorage interface for AZBlob
type AZBlobBackupStorage struct {
}
Expand Down
50 changes: 50 additions & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package mysqlctl

import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -34,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -136,6 +139,53 @@ func Backup(ctx context.Context, params BackupParams) error {
return finishErr
}

// GetBackupInfo returns the name of the backupengine used to produce a given
// backup, based on the MANIFEST file from the backup, and the Status of the
// backup, based on the engine-specific definition of what makes a complete or
// valid backup.
func GetBackupInfo(ctx context.Context, bh backupstorage.BackupHandle) (engine string, status mysqlctlpb.BackupInfo_Status, err error) {
mfest, err := bh.ReadFile(ctx, backupManifestFileName)
if err != nil {
// (TODO|@ajm88): extend (backupstorage.BackupHandle).ReadFile to wrap
// certain errors as fs.ErrNotExist, and distinguish between INCOMPLETE
// (MANIFEST has not been written to storage) and INVALID (MANIFEST
// exists but can't be read/parsed).
return "", mysqlctlpb.BackupInfo_INCOMPLETE, err
}
defer mfest.Close()

mfestBytes, err := ioutil.ReadAll(mfest)
if err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

// We unmarshal into a map here rather than using the GetBackupManifest
// because we are going to pass the raw mfestBytes to the particular
// backupengine implementation for further unmarshalling and processing.
//
// As a result, some of this code is duplicated with other functions in this
// package, but doing things this way has the benefit of minimizing extra
// calls to backupstorage.BackupHandle methods (which can be network-y and
// slow, or subject to external rate limits, etc).
var manifest map[string]interface{}
if err := json.Unmarshal(mfestBytes, &manifest); err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

engine, ok := manifest["BackupMethod"].(string)
if !ok {
return "", mysqlctlpb.BackupInfo_INVALID, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "missing BackupMethod field in MANIFEST")
}

be, err := getBackupEngine(engine)
if err != nil {
return engine, mysqlctlpb.BackupInfo_COMPLETE, err
}

status, err = be.GetBackupStatus(ctx, bh, mfestBytes)
return engine, status, err
}

// ParseBackupName parses the backup name for a given dir/name, according to
// the format generated by mysqlctl.Backup. An error is returned only if the
// backup name does not have the expected number of parts; errors parsing the
Expand Down
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

var (
Expand All @@ -44,6 +46,10 @@ var (
// BackupEngine is the interface to take a backup with a given engine.
type BackupEngine interface {
ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error)
// GetBackupStatus returns the status of a given backup, according to the
// specifics of the particular backupengine implementation. See the comments
// on the various implementations for more information.
GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error)
ShouldDrainForBackup() bool
}

Expand Down Expand Up @@ -119,6 +125,10 @@ var BackupRestoreEngineMap = make(map[string]BackupRestoreEngine)
// This must only be called after flags have been parsed.
func GetBackupEngine() (BackupEngine, error) {
name := *backupEngineImplementation
return getBackupEngine(name)
}

func getBackupEngine(name string) (BackupEngine, error) {
be, ok := BackupRestoreEngineMap[name]
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "unknown BackupEngine implementation %q", name)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ type BackupHandle interface {
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// CheckFile checks if a file is included in a backup. Only works for
// read-only backups (created by ListBackups). Returns a boolean to indicate
// if the file exists, and an error. Variants of "file not found" errors do
// result in an error, but instead result in (false, nil).
CheckFile(ctx context.Context, filename string) (bool, error)

// concurrency.ErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
concurrency.ErrorRecorder
Expand Down
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

const (
Expand Down Expand Up @@ -456,6 +458,14 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
return nil
}

// GetBackupStatus is part of the BackupEngine interface.
//
// This is currently not implemented for builtinbackupengine, so we always
// return UNKNOWN.
func (be *BuiltinBackupEngine) GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error) {
return mysqlctlpb.BackupInfo_UNKNOWN, nil
}

// ExecuteRestore restores from a backup. If the restore is successful
// we return the position from which replication should start
// otherwise an error is returned
Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/cephbackupstorage/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func (bh *CephBackupHandle) ReadFile(ctx context.Context, filename string) (io.R
return bh.client.GetObjectWithContext(ctx, bucket, object, minio.GetObjectOptions{})
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *CephBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
// (TODO) when we implement this, use bh.client.StatObject
return false, nil
}

// CephBackupStorage implements BackupStorage for Ceph Cloud Storage.
type CephBackupStorage struct {
// client is the instance of the Ceph Cloud Storage Go client.
Expand Down
19 changes: 19 additions & 0 deletions go/vt/mysqlctl/filebackupstorage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ func (fbh *FileBackupHandle) ReadFile(ctx context.Context, filename string) (io.
return os.Open(p)
}

// CheckFile is part of the BackupHandle interface.
func (fbh *FileBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
if !fbh.readOnly {
return false, fmt.Errorf("CheckFile cannot be called on read-write backup")
}

p := path.Join(*FileBackupStorageRoot, fbh.dir, fbh.name, filename)
_, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}

return false, err
}

return true, nil
}

// FileBackupStorage implements BackupStorage for local file system.
type FileBackupStorage struct{}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/gcsbackupstorage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (bh *GCSBackupHandle) ReadFile(ctx context.Context, filename string) (io.Re
return bh.client.Bucket(*bucket).Object(object).NewReader(ctx)
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *GCSBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
return false, nil
}

// GCSBackupStorage implements BackupStorage for Google Cloud Storage.
type GCSBackupStorage struct {
// client is the instance of the Google Cloud Storage Go client.
Expand Down
27 changes: 27 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -207,6 +208,32 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea
return out.Body, nil
}

// CheckFile is part of the backupstorage.BackupHandle interface.
func (bh *S3BackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
if !bh.readOnly {
return false, fmt.Errorf("CheckFile cannot be called on read-write backup")
}
object := objName(bh.dir, bh.name, filename)
_, err := bh.client.HeadObject(&s3.HeadObjectInput{
Bucket: bucket,
Key: object,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NotFound" {
return false, nil
}
}

return false, err
}

return true, nil
}

var _ backupstorage.BackupHandle = (*S3BackupHandle)(nil)

type S3ServerSideEncryption struct {
Expand Down
27 changes: 27 additions & 0 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

// XtrabackupEngine encapsulates the logic of the xtrabackup engine
Expand Down Expand Up @@ -370,6 +372,31 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
return replicationPosition, nil
}

// GetBackupStatus is part of the BackupEngine interface.
//
// For xtrabackup, we currently (we may want to expand this later) define a
// backup status as:
// - manifest can be read but contains invalid json => INVALID
// - the FileName in the manifest does not exist => INVALID
// - the FileName in the manifest exists => VALID
func (be *XtrabackupEngine) GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error) {
var manifest xtraBackupManifest
if err := json.Unmarshal(mfestBytes, &manifest); err != nil {
return mysqlctlpb.BackupInfo_INVALID, err
}

exists, err := bh.CheckFile(ctx, manifest.FileName)
if err != nil {
return mysqlctlpb.BackupInfo_INVALID, err
}

if !exists {
return mysqlctlpb.BackupInfo_INVALID, nil
}

return mysqlctlpb.BackupInfo_VALID, nil
}

// ExecuteRestore restores from a backup. Any error is returned.
func (be *XtrabackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) {

Expand Down
12 changes: 9 additions & 3 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/mysqlctl/mysqlctlproto"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -385,9 +386,14 @@ func (s *VtctldServer) GetBackups(ctx context.Context, req *vtctldatapb.GetBacku
bi.Shard = req.Shard

if req.Detailed {
if i >= backupsToSkipDetails { // nolint:staticcheck
// (TODO:@ajm188) Update backupengine/backupstorage implementations
// to get Status info for backups.
if i >= backupsToSkipDetails {
engine, status, err := mysqlctl.GetBackupInfo(ctx, bh)
if err != nil {
log.Warningf("error getting detailed backup info for %s/%s %s/%s: %s", bi.Keyspace, bi.Shard, bi.Directory, bi.Name, err)
}

bi.Engine = engine
bi.Status = status
}
}

Expand Down

0 comments on commit ee64609

Please sign in to comment.