Skip to content

Commit

Permalink
GCS support
Browse files Browse the repository at this point in the history
Azure issues
Milvus certificate authentication (will be remoced later)
Signed-off-by: Gifi Siby <gifi.s@ibm.com>
  • Loading branch information
gifi-siby committed Nov 11, 2024
1 parent c9ae3a4 commit a1ea501
Show file tree
Hide file tree
Showing 16 changed files with 1,061 additions and 409 deletions.
59 changes: 47 additions & 12 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"

grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
Expand Down Expand Up @@ -72,7 +75,33 @@ func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (go
if params.MilvusCfg.TLSMode == 0 {
c, err = gomilvus.NewDefaultGrpcClientWithAuth(ctx, milvusEndpoint, params.MilvusCfg.User, params.MilvusCfg.Password)
} else if params.MilvusCfg.TLSMode == 1 || params.MilvusCfg.TLSMode == 2 {
c, err = gomilvus.NewDefaultGrpcClientWithTLSAuth(ctx, milvusEndpoint, params.MilvusCfg.User, params.MilvusCfg.Password)
// GIFI
var creds credentials.TransportCredentials
//c, err = gomilvus.NewDefaultGrpcClientWithTLSAuth(ctx, milvusEndpoint, params.MilvusCfg.User, params.MilvusCfg.Password)
if params.MilvusCfg.TLSMode == 1 {
//log.Debug("Start Milvus client", zap.String("TLSCertPath", params.MilvusCfg.TLSCertPath), zap.String("ServerName", params.MilvusCfg.ServerName))
creds, err = credentials.NewClientTLSFromFile(params.MilvusCfg.TLSCertPath, params.MilvusCfg.ServerName)
} else {
// creds, err = withCredential(opt.ClientPemPath, opt.ClientKeyPath, opt.CaPath)
log.Error("milvus.tlsMode 2 is not supported")
return nil, errors.New("milvus.TLSMode supports 0 and 1 currently")
}
if err != nil {
log.Error("failed to create client from the certificate", zap.Error(err))
return nil, err
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}

c, err = gomilvus.NewClient(ctx, gomilvus.Config{
Address: milvusEndpoint,
Username: params.MilvusCfg.User,
Password: params.MilvusCfg.Password,
EnableTLSAuth: true,
DialOptions: opts,
})

} else {
log.Error("milvus.TLSMode is not illegal, support value 0, 1, 2")
return nil, errors.New("milvus.TLSMode is not illegal, support value 0, 1, 2")
Expand Down Expand Up @@ -101,11 +130,13 @@ func createStorageClient(ctx context.Context, params paramtable.BackupParams) (s
BucketName: params.MinioCfg.BucketName,
AccessKeyID: params.MinioCfg.AccessKeyID,
SecretAccessKeyID: params.MinioCfg.SecretAccessKey,
UseSSL: params.MinioCfg.UseSSL,
UseIAM: params.MinioCfg.UseIAM,
IAMEndpoint: params.MinioCfg.IAMEndpoint,
RootPath: params.MinioCfg.RootPath,
CreateBucket: true,
GcpCredentialJSON: params.MinioCfg.GcpCredentialJSON,

UseSSL: params.MinioCfg.UseSSL,
UseIAM: params.MinioCfg.UseIAM,
IAMEndpoint: params.MinioCfg.IAMEndpoint,
RootPath: params.MinioCfg.RootPath,
CreateBucket: true,
}

minioClient, err := storage.NewChunkManager(ctx, params, storageConfig)
Expand Down Expand Up @@ -157,6 +188,7 @@ func (b *BackupContext) getMilvusClient() *MilvusClient {

func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager {
if b.milvusStorageClient == nil {
//log.Info("GIFI INSIDE getMilvusStorageClient")
minioEndPoint := b.params.MinioCfg.Address + ":" + b.params.MinioCfg.Port
log.Debug("create milvus storage client",
zap.String("address", minioEndPoint),
Expand All @@ -169,6 +201,7 @@ func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager {
BucketName: b.params.MinioCfg.BucketName,
AccessKeyID: b.params.MinioCfg.AccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.SecretAccessKey,
GcpCredentialJSON: b.params.MinioCfg.GcpCredentialJSON,
UseSSL: b.params.MinioCfg.UseSSL,
UseIAM: b.params.MinioCfg.UseIAM,
IAMEndpoint: b.params.MinioCfg.IAMEndpoint,
Expand All @@ -188,6 +221,7 @@ func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager {

func (b *BackupContext) getBackupStorageClient() storage.ChunkManager {
if b.backupStorageClient == nil {
//log.Info("GIFI INSIDE getBackupStorageClient")
minioEndPoint := b.params.MinioCfg.BackupAddress + ":" + b.params.MinioCfg.BackupPort
log.Debug("create backup storage client",
zap.String("address", minioEndPoint),
Expand All @@ -200,6 +234,7 @@ func (b *BackupContext) getBackupStorageClient() storage.ChunkManager {
BucketName: b.params.MinioCfg.BackupBucketName,
AccessKeyID: b.params.MinioCfg.BackupAccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.BackupSecretAccessKey,
GcpCredentialJSON: b.params.MinioCfg.BackupGcpCredentialJSON,
UseSSL: b.params.MinioCfg.BackupUseSSL,
UseIAM: b.params.MinioCfg.BackupUseIAM,
IAMEndpoint: b.params.MinioCfg.BackupIAMEndpoint,
Expand Down Expand Up @@ -647,13 +682,13 @@ func (b *BackupContext) GetRestore(ctx context.Context, request *backuppb.GetRes
}

task := b.meta.GetRestoreTask(request.GetId())
progress := int32(float32(task.GetRestoredSize()) * 100 / float32(task.GetToRestoreSize()))
// don't return zero
if progress == 0 {
progress = 1
}
task.Progress = progress
if task != nil {
progress := int32(float32(task.GetRestoredSize()) * 100 / float32(task.GetToRestoreSize()))
// don't return zero
if progress == 0 {
progress = 1
}
task.Progress = progress
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = task
Expand Down
78 changes: 34 additions & 44 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"strings"
"time"

"github.com/golang/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/samber/lo"
Expand Down Expand Up @@ -237,7 +236,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}
fields := make([]*backuppb.FieldSchema, 0)
for _, field := range completeCollection.Schema.Fields {
fieldBak := &backuppb.FieldSchema{
fields = append(fields, &backuppb.FieldSchema{
FieldID: field.ID,
Name: field.Name,
IsPrimaryKey: field.PrimaryKey,
Expand All @@ -248,18 +247,8 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
IndexParams: utils.MapToKVPair(field.IndexParams),
IsDynamic: field.IsDynamic,
IsPartitionKey: field.IsPartitionKey,
Nullable: field.Nullable,
ElementType: backuppb.DataType(field.ElementType),
}
defaultValue := field.DefaultValue
if defaultValue != nil {
bytes, err := proto.Marshal(field.DefaultValue)
if err != nil {
return err
}
fieldBak.DefaultValueProto = string(bytes)
}
fields = append(fields, fieldBak)
})
}
schema := &backuppb.CollectionSchema{
Name: completeCollection.Schema.CollectionName,
Expand Down Expand Up @@ -481,7 +470,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}

newSegIDs := lo.Map(unfilledSegments, func(segment *entity.Segment, _ int) int64 { return segment.ID })
log.Debug("Finished fill segment",
log.Info("Finished fill segment",
zap.String("databaseName", collectionBackup.GetDbName()),
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int64s("segments", newSegIDs))
Expand Down Expand Up @@ -540,6 +529,7 @@ func (b *BackupContext) backupCollectionExecute(ctx context.Context, collectionB
log.Info("backupCollectionExecute", zap.Any("collectionMeta", collectionBackup.String()))
backupInfo := b.meta.GetBackupByCollectionID(collectionBackup.GetCollectionId())
backupBinlogPath := BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())

for _, partition := range b.meta.GetPartitions(collectionBackup.CollectionId) {
segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
var currentSize int64 = 0
Expand Down Expand Up @@ -698,25 +688,10 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
for _, collection := range toBackupCollections {
collectionClone := collection
job := func(ctx context.Context) error {
retryForSpecificError := func(retries int, delay time.Duration) error {
for i := 0; i < retries; i++ {
err := b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce())
// If no error, return successfully
if err == nil {
return nil
}
// Retry only for the specific error
if strings.Contains(err.Error(), "rate limit exceeded") {
fmt.Printf("Attempt %d: Temporary error occurred, retrying...\n", i+1)
time.Sleep(delay)
continue
}
// Return immediately for any other error
return err
}
return fmt.Errorf("operation failed after %d retries", retries)
}
return retryForSpecificError(10, 10*time.Second)
err := retry.Do(ctx, func() error {
return b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce())
}, retry.Sleep(120*time.Second), retry.Attempts(3))
return err
}
jobId := b.getBackupCollectionWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
Expand Down Expand Up @@ -767,7 +742,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
backupInfo.ErrorMessage = err.Error()
return err
}
log.Info("finish backup all collections",
log.Info("finish executeCreateBackup",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Strings("collections", request.GetCollectionNames()),
Expand Down Expand Up @@ -819,9 +794,11 @@ func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, id string) erro

func (b *BackupContext) copySegments(ctx context.Context, backupBinlogPath string, segmentIDs []int64) error {
jobIds := make([]int64, 0)
// log.Debug("GIFI-copySegments before", zap.Any("segmentIDs", segmentIDs))
for _, v := range segmentIDs {
segmentID := v
segment := b.meta.GetSegment(segmentID)
// log.Debug("GIFI-copySegments After", zap.Any("segment", segment))
job := func(ctx context.Context) error {
return b.copySegment(ctx, backupBinlogPath, segment)
}
Expand All @@ -838,33 +815,41 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
zap.Int64("partition_id", segment.GetPartitionId()),
zap.Int64("segment_id", segment.GetSegmentId()),
zap.Int64("group_id", segment.GetGroupId()))
log.Info("copy segment", zap.String("backupBinlogPath", backupBinlogPath))

// generate target path
// milvus_rootpath/insert_log/collection_id/partition_id/segment_id/ =>
// backup_rootpath/backup_name/binlog/insert_log/collection_id/partition_id/group_id/segment_id
backupPathFunc := func(binlogPath, rootPath, backupBinlogPath string) string {
// log.Debug("GIFI-backupPathFunc", zap.String("binlogPath", binlogPath))
// log.Debug("GIFI-backupPathFunc", zap.String("rootPath", rootPath))
// log.Debug("GIFI-backupPathFunc", zap.String("backupBinlogPath", backupBinlogPath))

if rootPath == "" {
return backupBinlogPath + SEPERATOR + binlogPath
} else {
return strings.Replace(binlogPath, rootPath, backupBinlogPath, 1)
checkvar := strings.Replace(binlogPath, rootPath, backupBinlogPath, 1)
//log.Debug("GIFI-backupPathFunc", zap.String("checkvar", checkvar))
return checkvar
}
}
// insert log
for _, binlogs := range segment.GetBinlogs() {
// log.Debug("GIFI INSIDE copySegment first loop")
for _, binlog := range binlogs.GetBinlogs() {
targetPath := backupPathFunc(binlog.GetLogPath(), b.milvusRootPath, backupBinlogPath)
// use segmentID as group id
//log.Debug("GIFI INSIDE copySegment", zap.String("targetPath", targetPath))
// B
segment.GroupId = segment.SegmentId
if segment.GetGroupId() != 0 {
targetPath = strings.Replace(targetPath,
strconv.FormatInt(segment.GetPartitionId(), 10),
strconv.FormatInt(segment.GetPartitionId(), 10)+"/"+strconv.FormatInt(segment.GetGroupId(), 10),
1)
}
//log.Debug("GIFI INSIDE copySegment", zap.String("targetPath", targetPath))
if targetPath == binlog.GetLogPath() {
return errors.New(fmt.Sprintf("copy src path and dst path can not be the same, src: %s dst: %s", binlog.GetLogPath(), targetPath))
}

//binlog := binlog
exist, err := b.getMilvusStorageClient().Exist(ctx, b.milvusBucketName, binlog.GetLogPath())
if err != nil {
Expand All @@ -879,11 +864,13 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
zap.String("file", binlog.GetLogPath()))
return err
}

err = retry.Do(ctx, func() error {
path := binlog.GetLogPath()
return b.getBackupCopier().Copy(ctx, path, targetPath, b.milvusBucketName, b.backupBucketName)
// log.Info("TEST GIFI", zap.String("path", path))
erro := b.getBackupCopier().Copy(ctx, path, targetPath, b.milvusBucketName, b.backupBucketName)
return erro
}, retry.Sleep(2*time.Second), retry.Attempts(5))

if err != nil {
log.Info("Fail to copy file after retry",
zap.Error(err),
Expand Down Expand Up @@ -949,15 +936,15 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackupInfo *backuppb.SegmentBackupInfo) error {
var size int64 = 0
var rootPath string

log.Debug("GIFI INSIDE fillSegmentBackupInfo")
if b.params.MinioCfg.RootPath != "" {
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
}

insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId())
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
log.Debug("GIFI insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
// handle segment level
isL0 := false
Expand All @@ -968,12 +955,16 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup
log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err))
return err
}
log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir))
log.Debug("GIFI INSIDE fillSegmentBackupInfo", zap.Any("fieldsLogDir", fieldsLogDir))
insertLogs := make([]*backuppb.FieldBinlog, 0)
for _, fieldLogDir := range fieldsLogDir {
binlogPaths, sizes, _ := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false)
fieldIdStr := strings.Replace(strings.Replace(fieldLogDir, insertPath, "", 1), SEPERATOR, "", -1)
fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64)
log.Debug("GIFI fillSegmentBackupInfo", zap.Any("fieldLogDir", fieldLogDir))
log.Debug("GIFI fillSegmentBackupInfo", zap.Any("binlogPaths", binlogPaths))
log.Debug("GIFI fillSegmentBackupInfo", zap.Any("fieldIdStr", fieldIdStr))
log.Debug("GIFI fillSegmentBackupInfo", zap.Any("fieldId", fieldId))
binlogs := make([]*backuppb.Binlog, 0)
for index, binlogPath := range binlogPaths {
binlogs = append(binlogs, &backuppb.Binlog{
Expand Down Expand Up @@ -1039,7 +1030,6 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup
segmentBackupInfo.Size = size
segmentBackupInfo.IsL0 = isL0
b.meta.UpdateSegment(segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId(), setSegmentBinlogs(insertLogs), setSegmentDeltaBinlogs(deltaLogs), setSegmentSize(size), setSegmentL0(isL0))
log.Debug("fill segment info", zap.Int64("segId", segmentBackupInfo.GetSegmentId()), zap.Int64("size", size))
return nil
}

Expand Down
Loading

0 comments on commit a1ea501

Please sign in to comment.