Skip to content

Commit

Permalink
Make dbCollections parameter compatible to both string and json struct (
Browse files Browse the repository at this point in the history
#170)

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink authored and yelusion2 committed Oct 16, 2023
1 parent 34bde71 commit 46d22c4
Show file tree
Hide file tree
Showing 17 changed files with 470 additions and 299 deletions.
3 changes: 2 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/utils"
)

var (
Expand Down Expand Up @@ -55,7 +56,7 @@ var createBackupCmd = &cobra.Command{
resp := backupContext.CreateBackup(context, &backuppb.CreateBackupRequest{
BackupName: backupName,
CollectionNames: collectionNameArr,
DbCollections: dbCollections,
DbCollections: utils.WrapDBCollections(dbCollections),
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
},
Expand Down
3 changes: 2 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/zilliztech/milvus-backup/core"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/log"

"go.uber.org/zap"
Expand Down Expand Up @@ -71,7 +72,7 @@ var restoreBackupCmd = &cobra.Command{
CollectionNames: collectionNameArr,
CollectionSuffix: renameSuffix,
CollectionRenames: renameMap,
DbCollections: restoreDatabaseCollections,
DbCollections: utils.WrapDBCollections(restoreDatabaseCollections),
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
Expand Down
3 changes: 2 additions & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/log"
"go.uber.org/zap"
"math/rand"
Expand All @@ -21,7 +22,7 @@ func TestCreateBackup(t *testing.T) {
req := &backuppb.CreateBackupRequest{
BackupName: "test_21",
//CollectionNames: []string{"hello_milvus", "hello_milvus2"},
DbCollections: "{\"db1\":[]}",
DbCollections: utils.WrapDBCollections(""),
}
backup.CreateBackup(context, req)
}
Expand Down
11 changes: 6 additions & 5 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Strings("collections", request.GetCollectionNames()),
zap.String("databaseCollections", request.GetDbCollections()),
zap.String("databaseCollections", utils.GetCreateDBCollections(request)),
zap.Bool("async", request.GetAsync()))

resp := &backuppb.BackupInfoResponse{
Expand Down Expand Up @@ -139,16 +139,17 @@ type collection struct {
func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collection, error) {
log.Debug("Request collection names",
zap.Strings("request_collection_names", request.GetCollectionNames()),
zap.String("request_db_collections", request.GetDbCollections()),
zap.String("request_db_collections", utils.GetCreateDBCollections(request)),
zap.Int("length", len(request.GetCollectionNames())))
var toBackupCollections []collection

dbCollectionsStr := utils.GetCreateDBCollections(request)
// first priority: dbCollections
if request.GetDbCollections() != "" {
if dbCollectionsStr != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
err := jsoniter.UnmarshalFromString(dbCollectionsStr, &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in CreateBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
log.Error("fail in unmarshal dbCollections in CreateBackupRequest", zap.String("dbCollections", dbCollectionsStr), zap.Error(err))
return nil, err
}
for db, collections := range dbCollections {
Expand Down
9 changes: 5 additions & 4 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
zap.Bool("async", request.GetAsync()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.String("databaseCollections", request.GetDbCollections()))
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)))

resp := &backuppb.RestoreBackupResponse{
RequestId: request.GetRequestId(),
Expand Down Expand Up @@ -102,11 +102,12 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)

if request.GetDbCollections() != "" {
dbCollectionsStr := utils.GetRestoreDBCollections(request)
if dbCollectionsStr != "" {
var dbCollections DbCollections
err := jsoniter.UnmarshalFromString(request.GetDbCollections(), &dbCollections)
err := jsoniter.UnmarshalFromString(dbCollectionsStr, &dbCollections)
if err != nil {
log.Error("fail in unmarshal dbCollections in RestoreBackupRequest", zap.String("dbCollections", request.GetDbCollections()), zap.Error(err))
log.Error("fail in unmarshal dbCollections in RestoreBackupRequest", zap.String("dbCollections", dbCollectionsStr), zap.Error(err))
errorMsg := fmt.Sprintf("fail in unmarshal dbCollections in RestoreBackupRequest, dbCollections: %s, err: %s", request.GetDbCollections(), err)
log.Error(errorMsg)
resp.Code = backuppb.ResponseCode_Fail
Expand Down
24 changes: 16 additions & 8 deletions core/backup_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ func wrapHandler(handle handlerFunc) gin.HandlerFunc {
// @Success 200 {object} backuppb.BackupInfoResponse
// @Router /create [post]
func (h *Handlers) handleCreateBackup(c *gin.Context) (interface{}, error) {
json := backuppb.CreateBackupRequest{}
c.BindJSON(&json)
json.RequestId = c.GetHeader("request_id")
resp := h.backupContext.CreateBackup(h.backupContext.ctx, &json)
requestBody := backuppb.CreateBackupRequest{}
if err := c.ShouldBindJSON(&requestBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
return nil, nil
}
requestBody.RequestId = c.GetHeader("request_id")
resp := h.backupContext.CreateBackup(h.backupContext.ctx, &requestBody)
if h.backupContext.params.HTTPCfg.SimpleResponse {
resp = SimpleBackupResponse(resp)
}
Expand Down Expand Up @@ -216,10 +219,15 @@ func (h *Handlers) handleDeleteBackup(c *gin.Context) (interface{}, error) {
// @Success 200 {object} backuppb.RestoreBackupResponse
// @Router /restore [post]
func (h *Handlers) handleRestoreBackup(c *gin.Context) (interface{}, error) {
json := backuppb.RestoreBackupRequest{}
c.BindJSON(&json)
json.RequestId = c.GetHeader("request_id")
resp := h.backupContext.RestoreBackup(h.backupContext.ctx, &json)
requestBody := backuppb.RestoreBackupRequest{}
//c.BindJSON(&json)
if err := c.ShouldBindJSON(&requestBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
return nil, nil
}

requestBody.RequestId = c.GetHeader("request_id")
resp := h.backupContext.RestoreBackup(h.backupContext.ctx, &requestBody)
if h.backupContext.params.HTTPCfg.SimpleResponse {
resp = SimpleRestoreResponse(resp)
}
Expand Down
76 changes: 11 additions & 65 deletions core/milvus_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,76 +243,22 @@ func TestDescribeIndex(t *testing.T) {
fmt.Println(err)
}

func TestMultiDB(t *testing.T) {
func TestCleanAll(t *testing.T) {
ctx := context.Background()
milvusAddr := "localhost:19530"
//c, err := proxy.NewClient(context, milvusAddr)
//assert.NoError(t, err)
milvusAddr := "10.102.9.64:19530"

c2, err := gomilvus.NewGrpcClient(ctx, milvusAddr)
assert.NoError(t, err)

c2.UsingDatabase(ctx, "wanganyang")
//c2.CreateDatabase(ctx, "wanganyang")

//_COLLECTION_NAME := "demo_bulk_insert2"
//_ID_FIELD_NAME := "id_field"
//_VECTOR_FIELD_NAME := "float_vector_field"
//_STR_FIELD_NAME := "str_field"
//
//// String field parameter
//_MAX_LENGTH := "65535"
//
//// Vector field parameter
//_DIM := "8"

//field1 = FieldSchema(name=_ID_FIELD_NAME, dtype=DataType.INT64, description="int64", is_primary=True, auto_id=True)
//field2 = FieldSchema(name=_VECTOR_FIELD_NAME, dtype=DataType.FLOAT_VECTOR, description="float vector", dim=_DIM,
// is_primary=False)
//field3 = FieldSchema(name=_STR_FIELD_NAME, dtype=DataType.VARCHAR, description="string",
// max_length=_MAX_LENGTH, is_primary=False)
//schema = CollectionSchema(fields=[field1, field2, field3], description="collection description")
//collection = Collection(name=_COLLECTION_NAME, data=None, schema=schema)

//field1 := &entity.Field{
// Name: _ID_FIELD_NAME,
// DataType: entity.FieldTypeInt64,
// Description: "int64",
// PrimaryKey: true,
// AutoID: true,
//}
//field2 := &entity.Field{
// Name: _VECTOR_FIELD_NAME,
// DataType: entity.FieldTypeFloatVector,
// Description: "float vector",
// TypeParams: map[string]string{
// entity.TypeParamDim: _DIM,
// },
// PrimaryKey: false,
//}
//field3 := &entity.Field{
// Name: _STR_FIELD_NAME,
// DataType: entity.FieldTypeVarChar,
// Description: "string",
// PrimaryKey: false,
// TypeParams: map[string]string{
// entity.TypeParamMaxLength: _MAX_LENGTH,
// },
//}
//schema := &entity.Schema{
// CollectionName: _COLLECTION_NAME,
// Description: "demo bulkinsert",
// AutoID: true,
// Fields: []*entity.Field{field1, field2, field3},
//}
////client.DropCollection(ctx, _COLLECTION_NAME)
//c2.CreateCollection(ctx, schema, 2)
dbs, err := c2.ListDatabases(ctx)

collections, err := c2.ListCollections(ctx)
for _, coll := range collections {
log.Info("collections", zap.Any("coll", coll.Name), zap.Int64("id", coll.ID))
for _, db := range dbs {
c2.UsingDatabase(ctx, db.Name)
collections, _ := c2.ListCollections(ctx)
for _, coll := range collections {
c2.DropCollection(ctx, coll.Name)
log.Info("collections", zap.Any("coll", coll.Name), zap.Int64("id", coll.ID))
}
c2.DropDatabase(ctx, db.Name)
}

dbs, err := c2.ListDatabases(ctx)
log.Info("dbs", zap.Any("dbs", dbs))
}
6 changes: 4 additions & 2 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto3";
package milvus.proto.backup;

import "google/protobuf/struct.proto";

option go_package="github.com/zilliztech/milvus-backup/core/proto/backuppb";

service MilvusBackupService {
Expand Down Expand Up @@ -142,7 +144,7 @@ message CreateBackupRequest {
// async or not
bool async = 4;
// database and collections to backup. A json string. To support database. 2023.7.7
string db_collections = 5;
google.protobuf.Value db_collections = 5;
}

/**
Expand Down Expand Up @@ -242,7 +244,7 @@ message RestoreBackupRequest {
// if bucket_name and path is set. will override bucket/path in config.
string path = 8;
// database and collections to restore. A json string. To support database. 2023.7.7
string db_collections = 9;
google.protobuf.Value db_collections = 9;
}

message RestorePartitionTask {
Expand Down
Loading

0 comments on commit 46d22c4

Please sign in to comment.