Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support azure #212

Merged
merged 1 commit into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ milvus:

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
cloudProvider: "minio" # remote cloud storage provider: s3, gcp, aliyun, azure

address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
useIAM: false
cloudProvider: "aws"
iamEndpoint: ""

bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
Expand Down
13 changes: 1 addition & 12 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,7 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
zap.String("address", minioEndPoint),
zap.String("bucket", params.MinioCfg.BucketName),
zap.String("backupBucket", params.MinioCfg.BackupBucketName))
minioClient, err := storage.NewMinioChunkManager(ctx,
storage.Address(minioEndPoint),
storage.AccessKeyID(params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey),
storage.UseSSL(params.MinioCfg.UseSSL),
storage.BucketName(params.MinioCfg.BackupBucketName),
storage.RootPath(params.MinioCfg.RootPath),
storage.CloudProvider(params.MinioCfg.CloudProvider),
storage.UseIAM(params.MinioCfg.UseIAM),
storage.IAMEndpoint(params.MinioCfg.IAMEndpoint),
storage.CreateBucket(true),
)
minioClient, err := storage.NewChunkManager(ctx, params)
return minioClient, err
}

Expand Down
2 changes: 1 addition & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestListBackups(t *testing.T) {
assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success)

backupListsWithCollection := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{
CollectionName: "hello_milvus",
//CollectionName: "hello_milvus",
})

for _, backup := range backupListsWithCollection.GetData() {
Expand Down
20 changes: 10 additions & 10 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
if strings.Contains(err.Error(), "index not found") ||
strings.HasPrefix(err.Error(), "index doesn't exist") {
// todo
log.Warn("field has no index",
log.Info("field has no index",
zap.String("collection_name", completeCollection.Name),
zap.String("field_name", field.Name),
zap.Error(err))
zap.String("field_name", field.Name))
continue
} else {
log.Error("fail in DescribeIndex", zap.Error(err))
Expand Down Expand Up @@ -729,30 +728,31 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S
return nil
}

func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) {
segmentBackupInfo := backuppb.SegmentBackupInfo{
SegmentId: segmentID,
CollectionId: collecitonID,
CollectionId: collectionID,
PartitionId: partitionID,
NumOfRows: numOfRows,
}
var size int64 = 0
var rootPath string

if b.params.MinioCfg.RootPath != "" {
log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath))
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
}

insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collecitonID, partitionID, segmentID)
log.Debug("insertPath", zap.String("insertPath", insertPath))
insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID)
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
if err != nil {
log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err))
return &segmentBackupInfo, err
}
log.Debug("fieldsLogDir", zap.Any("fieldsLogDir", fieldsLogDir))
log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir))
insertLogs := make([]*backuppb.FieldBinlog, 0)
for _, fieldLogDir := range fieldsLogDir {
binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false)
Expand All @@ -772,7 +772,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collecitonID, partitionID, segmentID)
deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID)
deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false)
deltaLogs := make([]*backuppb.FieldBinlog, 0)
for _, deltaFieldLogDir := range deltaFieldsLogDir {
Expand All @@ -798,7 +798,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64,
})
}

//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID)
//statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collectionID, partitionID, segmentID)
//statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false)
//statsLogs := make([]*backuppb.FieldBinlog, 0)
//for _, statsFieldLogDir := range statsFieldsLogDir {
Expand Down
2 changes: 2 additions & 0 deletions core/paramtable/base_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
DefaultMinioBackupBucketName = "a-bucket"
DefaultMinioBackupRootPath = "backup"

DefaultStorageType = "minio"

DefaultMilvusAddress = "localhost"
DefaultMilvusPort = "19530"
DefaultMilvusAuthorizationEnabled = "false"
Expand Down
21 changes: 17 additions & 4 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,19 @@ func (p *MilvusConfig) initTLSMode() {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Minio = "minio"
CloudProviderAWS = "aws"
CloudProviderGCP = "gcp"
CloudProviderAliyun = "ali"
CloudProviderAzure = "azure"
)

var supportedCloudProvider = map[string]bool{
Minio: true,
CloudProviderAWS: true,
CloudProviderGCP: true,
CloudProviderAliyun: true,
CloudProviderAzure: true,
}

type MinioConfig struct {
Expand All @@ -141,11 +145,14 @@ type MinioConfig struct {

BackupBucketName string
BackupRootPath string

StorageType string
}

func (p *MinioConfig) init(base *BaseTable) {
p.Base = base

p.initStorageType()
p.initAddress()
p.initPort()
p.initAccessKeyID()
Expand All @@ -172,10 +179,7 @@ func (p *MinioConfig) initPort() {
}

func (p *MinioConfig) initAccessKeyID() {
keyID, err := p.Base.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
keyID := p.Base.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey)
p.AccessKeyID = keyID
}

Expand Down Expand Up @@ -230,6 +234,15 @@ func (p *MinioConfig) initBackupRootPath() {
p.BackupRootPath = rootPath
}

func (p *MinioConfig) initStorageType() {
engine := p.Base.LoadWithDefault("storage.type",
p.Base.LoadWithDefault("minio.type", DefaultStorageType))
if !supportedCloudProvider[engine] {
panic("unsupported storage type:" + engine)
}
p.StorageType = engine
}

type HTTPConfig struct {
Base *BaseTable

Expand Down
15 changes: 15 additions & 0 deletions core/paramtable/params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package paramtable

import (
"testing"
)

func TestRootPathParams(t *testing.T) {
var params BackupParams
params.GlobalInitWithYaml("backup.yaml")
params.Init()

//cfg := &MinioConfig{}
//cfg.initRootPath()
println(params.MinioCfg.RootPath)
}
Loading