Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

add backup status to vtctld 9.0 branch #218

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions go/cmd/vtctldclient/internal/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ var GetBackups = &cobra.Command{
}

var getBackupsOptions = struct {
Limit uint32
OutputJSON bool
Limit uint32
Detailed bool
DetailedLimit uint32
OutputJSON bool
}{}

func commandGetBackups(cmd *cobra.Command, args []string) error {
Expand All @@ -49,15 +51,17 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.GetBackups(commandCtx, &vtctldatapb.GetBackupsRequest{
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Detailed: getBackupsOptions.Detailed,
DetailedLimit: getBackupsOptions.DetailedLimit,
})
if err != nil {
return err
}

if getBackupsOptions.OutputJSON {
if getBackupsOptions.OutputJSON || getBackupsOptions.Detailed {
data, err := cli.MarshalJSON(resp)
if err != nil {
return err
Expand All @@ -80,5 +84,7 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
func init() {
GetBackups.Flags().Uint32VarP(&getBackupsOptions.Limit, "limit", "l", 0, "Retrieve only the most recent N backups")
GetBackups.Flags().BoolVarP(&getBackupsOptions.OutputJSON, "json", "j", false, "Output backup info in JSON format rather than a list of backups")
GetBackups.Flags().BoolVar(&getBackupsOptions.Detailed, "detailed", false, "Get detailed backup info, such as the engine used for each backup, and its status. Implies --json.")
GetBackups.Flags().Uint32Var(&getBackupsOptions.DetailedLimit, "detailed-limit", 0, "Get detailed backup info for only the most recent N backups. Ignored if --detailed is not passed.")
Root.AddCommand(GetBackups)
}
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/azblobbackupstorage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (bh *AZBlobBackupHandle) ReadFile(ctx context.Context, filename string) (io
}), nil
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *AZBlobBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
return false, nil
}

// AZBlobBackupStorage structs implements the BackupStorage interface for AZBlob
type AZBlobBackupStorage struct {
}
Expand Down
50 changes: 50 additions & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package mysqlctl

import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -34,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -136,6 +139,53 @@ func Backup(ctx context.Context, params BackupParams) error {
return finishErr
}

// GetBackupInfo returns the name of the backupengine used to produce a given
// backup, based on the MANIFEST file from the backup, and the Status of the
// backup, based on the engine-specific definition of what makes a complete or
// valid backup.
func GetBackupInfo(ctx context.Context, bh backupstorage.BackupHandle) (engine string, status mysqlctlpb.BackupInfo_Status, err error) {
mfest, err := bh.ReadFile(ctx, backupManifestFileName)
if err != nil {
// (TODO|@ajm88): extend (backupstorage.BackupHandle).ReadFile to wrap
// certain errors as fs.ErrNotExist, and distinguish between INCOMPLETE
// (MANIFEST has not been written to storage) and INVALID (MANIFEST
// exists but can't be read/parsed).
return "", mysqlctlpb.BackupInfo_INCOMPLETE, err
}
defer mfest.Close()

mfestBytes, err := ioutil.ReadAll(mfest)
if err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

// We unmarshal into a map here rather than using the GetBackupManifest
// because we are going to pass the raw mfestBytes to the particular
// backupengine implementation for further unmarshalling and processing.
//
// As a result, some of this code is duplicated with other functions in this
// package, but doing things this way has the benefit of minimizing extra
// calls to backupstorage.BackupHandle methods (which can be network-y and
// slow, or subject to external rate limits, etc).
var manifest map[string]interface{}
if err := json.Unmarshal(mfestBytes, &manifest); err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

engine, ok := manifest["BackupMethod"].(string)
if !ok {
return "", mysqlctlpb.BackupInfo_INVALID, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "missing BackupMethod field in MANIFEST")
}

be, err := getBackupEngine(engine)
if err != nil {
return engine, mysqlctlpb.BackupInfo_COMPLETE, err
}

status, err = be.GetBackupStatus(ctx, bh, mfestBytes)
return engine, status, err
}

// ParseBackupName parses the backup name for a given dir/name, according to
// the format generated by mysqlctl.Backup. An error is returned only if the
// backup name does not have the expected number of parts; errors parsing the
Expand Down
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

var (
Expand All @@ -44,6 +46,10 @@ var (
// BackupEngine is the interface to take a backup with a given engine.
type BackupEngine interface {
ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error)
// GetBackupStatus returns the status of a given backup, according to the
// specifics of the particular backupengine implementation. See the comments
// on the various implementations for more information.
GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error)
ShouldDrainForBackup() bool
}

Expand Down Expand Up @@ -119,6 +125,10 @@ var BackupRestoreEngineMap = make(map[string]BackupRestoreEngine)
// This must only be called after flags have been parsed.
func GetBackupEngine() (BackupEngine, error) {
name := *backupEngineImplementation
return getBackupEngine(name)
}

func getBackupEngine(name string) (BackupEngine, error) {
be, ok := BackupRestoreEngineMap[name]
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "unknown BackupEngine implementation %q", name)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ type BackupHandle interface {
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// CheckFile checks if a file is included in a backup. Only works for
// read-only backups (created by ListBackups). Returns a boolean to indicate
// if the file exists, and an error. Variants of "file not found" errors do
// result in an error, but instead result in (false, nil).
CheckFile(ctx context.Context, filename string) (bool, error)

// concurrency.ErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
concurrency.ErrorRecorder
Expand Down
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

const (
Expand Down Expand Up @@ -456,6 +458,14 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
return nil
}

// GetBackupStatus is part of the BackupEngine interface.
//
// This is currently not implemented for builtinbackupengine, so we always
// return UNKNOWN.
func (be *BuiltinBackupEngine) GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error) {
return mysqlctlpb.BackupInfo_UNKNOWN, nil
}

// ExecuteRestore restores from a backup. If the restore is successful
// we return the position from which replication should start
// otherwise an error is returned
Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/cephbackupstorage/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func (bh *CephBackupHandle) ReadFile(ctx context.Context, filename string) (io.R
return bh.client.GetObjectWithContext(ctx, bucket, object, minio.GetObjectOptions{})
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *CephBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
// (TODO) when we implement this, use bh.client.StatObject
return false, nil
}

// CephBackupStorage implements BackupStorage for Ceph Cloud Storage.
type CephBackupStorage struct {
// client is the instance of the Ceph Cloud Storage Go client.
Expand Down
19 changes: 19 additions & 0 deletions go/vt/mysqlctl/filebackupstorage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ func (fbh *FileBackupHandle) ReadFile(ctx context.Context, filename string) (io.
return os.Open(p)
}

// CheckFile is part of the BackupHandle interface.
func (fbh *FileBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
if !fbh.readOnly {
return false, fmt.Errorf("CheckFile cannot be called on read-write backup")
}

p := path.Join(*FileBackupStorageRoot, fbh.dir, fbh.name, filename)
_, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}

return false, err
}

return true, nil
}

// FileBackupStorage implements BackupStorage for local file system.
type FileBackupStorage struct{}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/gcsbackupstorage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (bh *GCSBackupHandle) ReadFile(ctx context.Context, filename string) (io.Re
return bh.client.Bucket(*bucket).Object(object).NewReader(ctx)
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *GCSBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
return false, nil
}

// GCSBackupStorage implements BackupStorage for Google Cloud Storage.
type GCSBackupStorage struct {
// client is the instance of the Google Cloud Storage Go client.
Expand Down
27 changes: 27 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -207,6 +208,32 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea
return out.Body, nil
}

// CheckFile is part of the backupstorage.BackupHandle interface.
func (bh *S3BackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
if !bh.readOnly {
return false, fmt.Errorf("CheckFile cannot be called on read-write backup")
}
object := objName(bh.dir, bh.name, filename)
_, err := bh.client.HeadObject(&s3.HeadObjectInput{
Bucket: bucket,
Key: object,
SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg,
SSECustomerKey: bh.bs.s3SSE.customerKey,
SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5,
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NotFound" {
return false, nil
}
}

return false, err
}

return true, nil
}

var _ backupstorage.BackupHandle = (*S3BackupHandle)(nil)

type S3ServerSideEncryption struct {
Expand Down
27 changes: 27 additions & 0 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

// XtrabackupEngine encapsulates the logic of the xtrabackup engine
Expand Down Expand Up @@ -370,6 +372,31 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
return replicationPosition, nil
}

// GetBackupStatus is part of the BackupEngine interface.
//
// For xtrabackup, we currently (we may want to expand this later) define a
// backup status as:
// - manifest can be read but contains invalid json => INVALID
// - the FileName in the manifest does not exist => INVALID
// - the FileName in the manifest exists => VALID
func (be *XtrabackupEngine) GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error) {
var manifest xtraBackupManifest
if err := json.Unmarshal(mfestBytes, &manifest); err != nil {
return mysqlctlpb.BackupInfo_INVALID, err
}

exists, err := bh.CheckFile(ctx, manifest.FileName)
if err != nil {
return mysqlctlpb.BackupInfo_INVALID, err
}

if !exists {
return mysqlctlpb.BackupInfo_INVALID, nil
}

return mysqlctlpb.BackupInfo_VALID, nil
}

// ExecuteRestore restores from a backup. Any error is returned.
func (be *XtrabackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) {

Expand Down
12 changes: 9 additions & 3 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/mysqlctl/mysqlctlproto"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -385,9 +386,14 @@ func (s *VtctldServer) GetBackups(ctx context.Context, req *vtctldatapb.GetBacku
bi.Shard = req.Shard

if req.Detailed {
if i >= backupsToSkipDetails { // nolint:staticcheck
// (TODO:@ajm188) Update backupengine/backupstorage implementations
// to get Status info for backups.
if i >= backupsToSkipDetails {
engine, status, err := mysqlctl.GetBackupInfo(ctx, bh)
if err != nil {
log.Warningf("error getting detailed backup info for %s/%s %s/%s: %s", bi.Keyspace, bi.Shard, bi.Directory, bi.Name, err)
}

bi.Engine = engine
bi.Status = status
}
}

Expand Down