Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pass database in create and restore request #155

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
Expand All @@ -14,6 +15,8 @@ import (
var (
backupName string
collectionNames string
databases string
dbCollections string
)

var createBackupCmd = &cobra.Command{
Expand All @@ -35,17 +38,34 @@ var createBackupCmd = &cobra.Command{
} else {
collectionNameArr = strings.Split(collectionNames, ",")
}

if dbCollections == "" && databases != "" {
dbCollectionDict := make(map[string][]string)
splits := strings.Split(databases, ",")
for _, db := range splits {
dbCollectionDict[db] = []string{}
}
completeDbCollections, err := jsoniter.MarshalToString(dbCollectionDict)
dbCollections = completeDbCollections
if err != nil {
fmt.Println("illegal databases input")
return
}
}
resp := backupContext.CreateBackup(context, &backuppb.CreateBackupRequest{
BackupName: backupName,
CollectionNames: collectionNameArr,
DbCollections: dbCollections,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
},
}

func init() {
createBackupCmd.Flags().StringVarP(&backupName, "name", "n", "", "backup name, if unset will generate a name automatically")
createBackupCmd.Flags().StringVarP(&collectionNames, "colls", "", "", "collectionNames to backup, use ',' to connect multiple collections")
createBackupCmd.Flags().StringVarP(&collectionNames, "colls", "c", "", "collectionNames to backup, use ',' to connect multiple collections")
createBackupCmd.Flags().StringVarP(&databases, "databases", "d", "", "databases to backup")
createBackupCmd.Flags().StringVarP(&dbCollections, "database_collections", "f", "", "databases and collections to backup, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

rootCmd.AddCommand(createBackupCmd)
}
27 changes: 23 additions & 4 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
Expand All @@ -15,10 +16,12 @@ import (
)

var (
restoreBackupName string
restoreCollectionNames string
renameSuffix string
renameCollectionNames string
restoreBackupName string
restoreCollectionNames string
renameSuffix string
renameCollectionNames string
restoreDatabases string
restoreDatabaseCollections string
)

var restoreBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -50,11 +53,25 @@ var restoreBackupCmd = &cobra.Command{
}
}

if restoreDatabaseCollections == "" && restoreDatabases != "" {
dbCollectionDict := make(map[string][]string)
splits := strings.Split(restoreDatabases, ",")
for _, db := range splits {
dbCollectionDict[db] = []string{}
}
completeDbCollections, err := jsoniter.MarshalToString(dbCollectionDict)
restoreDatabaseCollections = completeDbCollections
if err != nil {
fmt.Println("illegal databases input")
return
}
}
resp := backupContext.RestoreBackup(context, &backuppb.RestoreBackupRequest{
BackupName: restoreBackupName,
CollectionNames: collectionNameArr,
CollectionSuffix: renameSuffix,
CollectionRenames: renameMap,
DbCollections: restoreDatabaseCollections,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
Expand All @@ -66,6 +83,8 @@ func init() {
restoreBackupCmd.Flags().StringVarP(&restoreCollectionNames, "collections", "c", "", "collectionNames to restore")
restoreBackupCmd.Flags().StringVarP(&renameSuffix, "suffix", "s", "", "add a suffix to collection name to restore")
restoreBackupCmd.Flags().StringVarP(&renameCollectionNames, "rename", "r", "", "rename collections to new names")
restoreBackupCmd.Flags().StringVarP(&restoreDatabases, "databases", "d", "", "databases to restore, if not set, restore all databases")
restoreBackupCmd.Flags().StringVarP(&restoreDatabaseCollections, "database_collections", "f", "", "databases and collections to restore, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

rootCmd.AddCommand(restoreBackupCmd)
}
39 changes: 39 additions & 0 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"go.uber.org/zap"

Expand All @@ -25,6 +26,7 @@ func (b BackupContext) CreateBackup(ctx context.Context, request *backuppb.Creat
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Strings("collections", request.GetCollectionNames()),
zap.String("databaseCollections", request.GetDbCollections()),
zap.Bool("async", request.GetAsync()))

resp := &backuppb.BackupInfoResponse{
Expand Down Expand Up @@ -130,11 +132,48 @@ type collection struct {
collectionName string
}

// parse collections to backup
// For backward compatibility:
// 1,parse dbCollections first,
// 2,if dbCollections not set, use collectionNames
func (b BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collection, error) {
log.Debug("Request collection names",
zap.Strings("request_collection_names", request.GetCollectionNames()),
zap.Int("length", len(request.GetCollectionNames())))
var toBackupCollections []collection

// first priority: dbCollections
if request.GetDbCollections() != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in CreateBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
return nil, err
}
for db, collections := range dbCollections {
if len(collections) == 0 {
err := b.milvusClient.UsingDatabase(b.ctx, db)
if err != nil {
log.Error("fail to call SDK use database", zap.Error(err))
return nil, err
}
collections, err := b.milvusClient.ListCollections(b.ctx)
if err != nil {
log.Error("fail in ListCollections", zap.Error(err))
return nil, err
}
for _, coll := range collections {
toBackupCollections = append(toBackupCollections, collection{db, coll.Name})
}
} else {
for _, coll := range collections {
toBackupCollections = append(toBackupCollections, collection{db, coll})
}
}
}
return toBackupCollections, nil
}

if request.GetCollectionNames() == nil || len(request.GetCollectionNames()) == 0 {
dbs, err := b.milvusClient.ListDatabases(b.ctx)
if err != nil {
Expand Down
36 changes: 34 additions & 2 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/cockroachdb/errors"
jsoniter "github.com/json-iterator/go"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"go.uber.org/zap"
Expand All @@ -29,7 +30,8 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
zap.Any("CollectionRenames", request.GetCollectionRenames()),
zap.Bool("async", request.GetAsync()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()))
zap.String("path", request.GetPath()),
zap.String("databaseCollections", request.GetDbCollections()))

resp := &backuppb.RestoreBackupResponse{
RequestId: request.GetRequestId(),
Expand Down Expand Up @@ -99,7 +101,36 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest

// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)
if len(request.GetCollectionNames()) == 0 {

if request.GetDbCollections() != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in RestoreBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
errorMsg := fmt.Sprintf("fail in unmarshal dbCollections in RestoreBackupRequest, dbCollections: %s, err: %s", request.GetDbCollections(), err)
log.Error(errorMsg)
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = errorMsg
return resp
}
for db, collections := range dbCollections {
if len(collections) == 0 {
for _, collectionBackup := range backup.GetCollectionBackups() {
if collectionBackup.GetDbName() == db {
toRestoreCollectionBackups = append(toRestoreCollectionBackups, collectionBackup)
}
}
} else {
for _, coll := range collections {
for _, collectionBackup := range backup.GetCollectionBackups() {
if collectionBackup.GetDbName() == db && collectionBackup.CollectionName == coll {
toRestoreCollectionBackups = append(toRestoreCollectionBackups, collectionBackup)
}
}
}
}
}
} else if len(request.GetCollectionNames()) == 0 {
toRestoreCollectionBackups = backup.GetCollectionBackups()
} else {
collectionNameDict := make(map[string]bool)
Expand Down Expand Up @@ -139,6 +170,7 @@ func (b BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Rest
targetCollectionName = backupCollectionName
}

b.milvusClient.UsingDatabase(ctx, restoreCollection.DbName)
exist, err := b.milvusClient.HasCollection(ctx, targetCollectionName)
if err != nil {
errorMsg := fmt.Sprintf("fail to check whether the collection is exist, collection_name: %s, err: %s", targetCollectionName, err)
Expand Down
4 changes: 4 additions & 0 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func SimpleBackupResponse(input *backuppb.BackupInfoResponse) *backuppb.BackupIn
collections = append(collections, &backuppb.CollectionBackupInfo{
StateCode: coll.GetStateCode(),
ErrorMessage: coll.GetErrorMessage(),
DbName: coll.GetDbName(),
CollectionName: coll.GetCollectionName(),
CollectionId: coll.GetCollectionId(),
BackupTimestamp: coll.GetBackupTimestamp(),
HasIndex: coll.GetHasIndex(),
IndexInfos: coll.GetIndexInfos(),
Expand Down Expand Up @@ -338,3 +340,5 @@ func UpdateRestoreBackupTask(input *backuppb.RestoreBackupTask) *backuppb.Restor
}
return input
}

type DbCollections = map[string][]string
92 changes: 92 additions & 0 deletions core/backup_meta_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package core

import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"testing"

jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -139,3 +144,90 @@ func TestBackupSerialize(t *testing.T) {
deserBackup, err := deserialize(serData)
log.Info(deserBackup.String())
}

func TestDbCollectionJson(t *testing.T) {
dbCollection := DbCollections{"db1": []string{"coll1", "coll2"}, "db2": []string{"coll3", "coll4"}}
jsonStr, err := jsoniter.MarshalToString(dbCollection)
assert.NoError(t, err)
println(jsonStr)

var dbCollection2 DbCollections
jsoniter.UnmarshalFromString(jsonStr, &dbCollection2)
println(dbCollection2)
}

func readBackup(backupDir string) (*backuppb.BackupInfo, error) {
readByteFunc := func(filepath string) ([]byte, error) {
file, err := os.OpenFile(filepath, os.O_RDWR, 0666)
if err != nil {
fmt.Println("Open file error!", err)
return nil, err
}

// Get the file size
stat, err := file.Stat()
if err != nil {
fmt.Println(err)
return nil, err
}

bs := make([]byte, stat.Size())
_, err = bufio.NewReader(file).Read(bs)
if err != nil && err != io.EOF {
fmt.Println(err)
return nil, err
}
return bs, nil
}

backupPath := backupDir + "/backup_meta.json"
collectionPath := backupDir + "/collection_meta.json"
partitionPath := backupDir + "/partition_meta.json"
segmentPath := backupDir + "/segment_meta.json"

backupMetaBytes, err := readByteFunc(backupPath)
if err != nil {
return nil, err
}
collectionBackupMetaBytes, err := readByteFunc(collectionPath)
if err != nil {
return nil, err
}
partitionBackupMetaBytes, err := readByteFunc(partitionPath)
if err != nil {
return nil, err
}
segmentBackupMetaBytes, err := readByteFunc(segmentPath)
if err != nil {
return nil, err
}

completeBackupMetas := &BackupMetaBytes{
BackupMetaBytes: backupMetaBytes,
CollectionMetaBytes: collectionBackupMetaBytes,
PartitionMetaBytes: partitionBackupMetaBytes,
SegmentMetaBytes: segmentBackupMetaBytes,
}

deserBackup, err := deserialize(completeBackupMetas)

return deserBackup, err
}

func TestReadBackupFile(t *testing.T) {
filepath := "/tmp/hxs_meta"

backupInfo, err := readBackup(filepath)
assert.NoError(t, err)

levelBackupInfo, err := treeToLevel(backupInfo)
assert.NoError(t, err)
assert.NotNil(t, levelBackupInfo)

output, _ := serialize(backupInfo)
BackupMetaStr := string(output.BackupMetaBytes)
segmentMetaStr := string(output.SegmentMetaBytes)
fmt.Sprintf(BackupMetaStr)
fmt.Sprintf(segmentMetaStr)
//log.Info("segment meta", zap.String("value", string(output.SegmentMetaBytes)))
}
4 changes: 4 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ message CreateBackupRequest {
repeated string collection_names = 3;
// async or not
bool async = 4;
// database and collections to backup. A json string. To support database. 2023.7.7
string db_collections = 5;
}

/**
Expand Down Expand Up @@ -239,6 +241,8 @@ message RestoreBackupRequest {
string bucket_name = 7;
// if bucket_name and path is set. will override bucket/path in config.
string path = 8;
// database and collections to restore. A json string. To support database. 2023.7.7
string db_collections = 9;
}

message RestorePartitionTask {
Expand Down
Loading