Skip to content

Commit

Permalink
Refine cross storage backup and add doc
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Aug 22, 2024
1 parent 9129e4a commit 5e83b64
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 39 deletions.
11 changes: 6 additions & 5 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ minio:
backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

# If you need to back up or restore data between two different storage systems, direct client-side copying is not supported.
# Set this option to true to enable data transfer through Milvus Backup.
# Note: This option will be automatically set to true if `minio.storageType` and `minio.backupStorageType` differ.
# However, if they are the same but belong to different services, you must manually set this option to `true`.
crossStorage: "false"

backup:
maxSegmentGroupSize: 2G

Expand All @@ -61,8 +67,3 @@ backup:
enable: true
seconds: 7200
address: http://localhost:9091

# If you need to backup or restore data between two different storage systems,
# direct client-side copying is not supported.
# Set this option to true to enable data transfer through Milvus Backup.
copyByServer: "false"
32 changes: 22 additions & 10 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"path"
"sync"
"time"

Expand Down Expand Up @@ -217,28 +218,37 @@ func (b *BackupContext) getBackupStorageClient() storage.ChunkManager {
}

func (b *BackupContext) getBackupCopier() *storage.Copier {
crossStorage := b.params.MinioCfg.CrossStorage
if b.getBackupStorageClient().Config().StorageType != b.getMilvusStorageClient().Config().StorageType {
crossStorage = true
}
if b.backupCopier == nil {
b.backupCopier = storage.NewCopier(
b.getMilvusStorageClient(),
b.getBackupStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
CopyByServer: crossStorage,
})
}
return b.backupCopier
}

func (b *BackupContext) getRestoreCopier() *storage.Copier {
crossStorage := b.params.MinioCfg.CrossStorage
// force set copyByServer is true if two storage type is different
if b.getBackupStorageClient().Config().StorageType != b.getMilvusStorageClient().Config().StorageType {
crossStorage = true
}
if b.restoreCopier == nil {
b.restoreCopier = storage.NewCopier(
b.getBackupStorageClient(),
b.getMilvusStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
CopyByServer: crossStorage,
})
}
return b.restoreCopier
Expand Down Expand Up @@ -670,36 +680,38 @@ func (b *BackupContext) Check(ctx context.Context) string {
"backup-rootpath: %s\n",
version, b.milvusBucketName, b.milvusRootPath, b.backupBucketName, b.backupRootPath)

paths, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
milvusFiles, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}

if len(paths) == 0 {
if len(milvusFiles) == 0 {
return "Milvus storage is empty. Please verify whether your cluster is really empty. If not, the configs(minio address, port, bucket, rootPath) may be wrong\n" + info
}

paths, _, err = b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
_, _, err = b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage backup path " + info + err.Error()
}

CHECK_PATH := "milvus_backup_check_" + time.Now().String()
checkSrcPath := path.Join(b.milvusRootPath, "milvus_backup_check_src_"+string(time.Now().Unix()))
checkDstPath := path.Join(b.backupRootPath, "milvus_backup_check_dst_"+string(time.Now().Unix()))

err = b.getMilvusStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
err = b.getMilvusStorageClient().Write(ctx, b.milvusBucketName, checkSrcPath, []byte{1})
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}
defer func() {
b.getMilvusStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH)
b.getMilvusStorageClient().Remove(ctx, b.milvusBucketName, checkSrcPath)
}()

err = b.getMilvusStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH)
log.Debug("check copy", zap.String("srcBucket", b.milvusBucketName), zap.String("destBucket", b.backupBucketName), zap.String("key", checkSrcPath), zap.String("destKey", checkDstPath))
err = b.getBackupCopier().Copy(ctx, checkSrcPath, checkDstPath, b.milvusBucketName, b.backupBucketName)
if err != nil {
return "Failed to copy file from milvus storage to backup storage\n" + info + err.Error()
}
defer func() {
b.getBackupStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH)
b.getBackupStorageClient().Remove(ctx, b.backupBucketName, checkDstPath)
}()

return "Succeed to connect to milvus and storage.\n" + info
Expand Down
11 changes: 5 additions & 6 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/log"
"github.com/zilliztech/milvus-backup/internal/util/retry"
Expand Down Expand Up @@ -856,8 +855,8 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
}

err = retry.Do(ctx, func() error {
attr := storage.ObjectAttr{Key: binlog.GetLogPath()}
return b.getBackupCopier().Copy(ctx, attr, targetPath, b.milvusBucketName, b.backupBucketName)
path := binlog.GetLogPath()
return b.getBackupCopier().Copy(ctx, path, targetPath, b.milvusBucketName, b.backupBucketName)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Info("Fail to copy file after retry",
Expand Down Expand Up @@ -901,8 +900,8 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
return errors.New("Binlog file not exist " + binlog.GetLogPath())
}
err = retry.Do(ctx, func() error {
attr := storage.ObjectAttr{Key: binlog.GetLogPath()}
return b.getBackupCopier().Copy(ctx, attr, targetPath, b.milvusBucketName, b.backupBucketName)
path := binlog.GetLogPath()
return b.getBackupCopier().Copy(ctx, path, targetPath, b.milvusBucketName, b.backupBucketName)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Info("Fail to copy file after retry",
Expand Down Expand Up @@ -1076,7 +1075,7 @@ func (b *BackupContext) backupRBAC(ctx context.Context, backupInfo *backuppb.Bac
Roles: roles,
Grants: grants,
}

log.Info("backup RBAC", zap.Int("users", len(users)), zap.Int("roles", len(roles)), zap.Int("grants", len(grants)))
b.meta.UpdateBackup(backupInfo.Id, setRBACMeta(rbacPb))
return nil
Expand Down
4 changes: 1 addition & 3 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.uber.org/zap"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
Expand Down Expand Up @@ -590,8 +589,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
tempFilekey := path.Join(tempDir, strings.Replace(file, b.params.MinioCfg.BackupRootPath, "", 1))
log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempFilekey))
err := retry.Do(ctx, func() error {
attr := storage.ObjectAttr{Key: file}
return b.getRestoreCopier().Copy(ctx, attr, tempFilekey, backupBucketName, b.milvusBucketName)
return b.getRestoreCopier().Copy(ctx, file, tempFilekey, backupBucketName, b.milvusBucketName)
}, retry.Sleep(2*time.Second), retry.Attempts(5))
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err))
Expand Down
25 changes: 13 additions & 12 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ type BackupConfig struct {
GcPauseEnable bool
GcPauseSeconds int
GcPauseAddress string

CopyByServer bool
}

func (p *BackupConfig) init(base *BaseTable) {
Expand All @@ -58,7 +56,6 @@ func (p *BackupConfig) init(base *BaseTable) {
p.initGcPauseEnable()
p.initGcPauseSeconds()
p.initGcPauseAddress()
p.initCopyByServer()
}

func (p *BackupConfig) initMaxSegmentGroupSize() {
Expand Down Expand Up @@ -104,15 +101,6 @@ func (p *BackupConfig) initGcPauseAddress() {
p.GcPauseAddress = address
}

func (p *BackupConfig) initCopyByServer() {
copyByServer := p.Base.LoadWithDefault("backup.copyByServer", "false")
var err error
p.CopyByServer, err = strconv.ParseBool(copyByServer)
if err != nil {
panic("parse bool CopyByServer:" + err.Error())
}
}

type MilvusConfig struct {
Base *BaseTable

Expand Down Expand Up @@ -229,6 +217,8 @@ type MinioConfig struct {
BackupRootPath string
BackupUseIAM bool
BackupIAMEndpoint string

CrossStorage bool
}

func (p *MinioConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -256,6 +246,8 @@ func (p *MinioConfig) init(base *BaseTable) {
p.initBackupRootPath()
p.initBackupUseIAM()
p.initBackupIAMEndpoint()

p.initCrossStorage()
}

func (p *MinioConfig) initAddress() {
Expand Down Expand Up @@ -400,6 +392,15 @@ func (p *MinioConfig) initBackupRootPath() {
p.BackupRootPath = rootPath
}

func (p *MinioConfig) initCrossStorage() {
crossStorage := p.Base.LoadWithDefault("backup.crossStorage", "false")
var err error
p.CrossStorage, err = strconv.ParseBool(crossStorage)
if err != nil {
panic("parse bool CrossStorage:" + err.Error())
}
}

type HTTPConfig struct {
Base *BaseTable

Expand Down
6 changes: 3 additions & 3 deletions core/storage/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ func (c *Copier) CopyPrefix(ctx context.Context, i CopyPathInput) error {
return nil
}

func (c *Copier) Copy(ctx context.Context, attr ObjectAttr, destPrefix, srcBucket, destBucket string) error {
func (c *Copier) Copy(ctx context.Context, srcPrefix, destPrefix, srcBucket, destBucket string) error {
fn := c.selectCopyFn()
srcAttrs, err := c.getAttrs(ctx, srcBucket, attr.Key, "")
srcAttrs, err := c.getAttrs(ctx, srcBucket, srcPrefix, "")
if err != nil {
return fmt.Errorf("storage: copier get src attrs %w", err)
}
for _, srcAttr := range srcAttrs {
destKey := strings.Replace(srcAttr.Key, attr.Key, destPrefix, 1)
destKey := strings.Replace(srcAttr.Key, srcPrefix, destPrefix, 1)
err := fn(ctx, srcAttr, destKey, srcBucket, destBucket)
if err != nil {
return err
Expand Down
92 changes: 92 additions & 0 deletions docs/cross_storage_backup_restore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Cross storage backup & restore

Previously, Milvus-backup utilized the Copy API of the storage client to back up data.
This limited the backup capability to the same storage type as the Milvus cluster.
However, there's a significant demand for cross-storage backups—for instance,
backup data from Minio to a local disk or backup from in-house storage to cloud storage.

Starting from version v0.4.21, Milvus-backup now supports cross-storage backups.
In this process, data is read from the source storage and written to the target storage through the Milvus-backup service.

This feature is currently in Beta. `azure` is not supported. Not all storage types are fully tested.

## Usage

To enable cross-storage backup, you only need to adjust the configurations in backup.yaml.

You can use `./milvus-backup check` first to see if the cross copy is working.

For example

*Back up data from Minio to a local disk*:

```yaml
# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
storageType: "minio"
address: localhost
port: 9000
accessKeyID: minioadmin
secretAccessKey: minioadmin
bucketName: "a-bucket"
rootPath: "files"

backupStorageType: "local"
backupRootPath: "/root/backup/"
```
*Backup from Minio to S3*
```yaml
minio:
storageType: "minio"
address: localhost
port: 9000
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
useIAM: false
iamEndpoint: ""
bucketName: "a-bucket"
rootPath: "files"

backupStorageType: "s3"
backupAddress: s3Address
backupPort: 443
backupAccessKeyID: s3AccessKey
backupSecretAccessKey: s3SecretAccessKey
backupBucketName: "s3-bucket"
backupRootPath: "s3-backup-path"
```
*Backup from Minio A to Minio B*
If the two storage locations are of the same type but belong to different services,
you need to add an additional configuration crossStorage=true to explicitly indicate that it is a cross-storage backup or restore operation.
```yaml
minio:
storageType: "minio"
address: addressA
port: 9000
accessKeyID: userA
secretAccessKey: passwdB
useSSL: false
useIAM: false
iamEndpoint: ""
bucketName: "a-bucket"
rootPath: "files"

backupStorageType: "minio"
backupAddress: addressB
backupPort: 9000
backupAccessKeyID: userB
backupSecretAccessKey: passwdB
backupBucketName: "b-bucket"
backupRootPath: "backup"

# If you need to back up or restore data between two different storage systems, direct client-side copying is not supported.
# Set this option to true to enable data transfer through Milvus Backup.
# Note: This option will be automatically set to true if `minio.storageType` and `minio.backupStorageType` differ.
# However, if they are the same but belong to different services, you must manually set this option to `true`.
crossStorage: "true"
```

0 comments on commit 5e83b64

Please sign in to comment.