Skip to content

Commit

Permalink
Support cross Azrue container backup
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 20, 2023
1 parent c94d9b1 commit cd1aeda
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 59 deletions.
4 changes: 4 additions & 0 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ minio:
bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance

# only for azure
backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3
backupSecretAccessKey: minioadmin # MinIO/S3 encryption string

backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

Expand Down
18 changes: 16 additions & 2 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ type MinioConfig struct {
CloudProvider string
IAMEndpoint string

BackupBucketName string
BackupRootPath string
BackupAccessKeyID string
BackupSecretAccessKey string
BackupBucketName string
BackupRootPath string

StorageType string
}
Expand All @@ -186,6 +188,8 @@ func (p *MinioConfig) init(base *BaseTable) {
p.initCloudProvider()
p.initIAMEndpoint()

p.initBackupAccessKeyID()
p.initBackupSecretAccessKey()
p.initBackupBucketName()
p.initBackupRootPath()
}
Expand Down Expand Up @@ -246,6 +250,16 @@ func (p *MinioConfig) initIAMEndpoint() {
p.IAMEndpoint = iamEndpoint
}

func (p *MinioConfig) initBackupAccessKeyID() {
keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", DefaultMinioAccessKey)
p.BackupAccessKeyID = keyID
}

func (p *MinioConfig) initBackupSecretAccessKey() {
key := p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey)
p.BackupSecretAccessKey = key
}

func (p *MinioConfig) initBackupBucketName() {
bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName)
p.BackupBucketName = bucketName
Expand Down
42 changes: 21 additions & 21 deletions core/storage/azure_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ import (

// AzureChunkManager is responsible for read and write data stored in minio.
type AzureChunkManager struct {
client *AzureObjectStorage
aos *AzureObjectStorage

//cli *azblob.Client
// ctx context.Context
bucketName string
rootPath string
//bucketName string
//rootPath string
}

var _ ChunkManager = (*AzureChunkManager)(nil)

func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) {
client, err := newAzureObjectStorageWithConfig(ctx, c)
aos, err := newAzureObjectStorageWithConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -58,19 +58,19 @@ func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, e
// return nil, err
//}
mcm := &AzureChunkManager{
client: client,
aos: aos,
//cli: cli,
bucketName: c.bucketName,
rootPath: strings.TrimLeft(c.rootPath, "/"),
//bucketName: c.bucketName,
//rootPath: strings.TrimLeft(c.rootPath, "/"),
}
log.Info("Azure chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath()))
log.Info("Azure chunk manager init success.")
return mcm, nil
}

// RootPath returns minio root path.
func (mcm *AzureChunkManager) RootPath() string {
return mcm.rootPath
}
//func (mcm *AzureChunkManager) RootPath() string {
// return mcm.rootPath
//}

func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error {
objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true)
Expand All @@ -80,7 +80,7 @@ func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, t
}
for _, objectkey := range objectkeys {
dstObjectKey := strings.Replace(objectkey, fromPath, toPath, 1)
err := mcm.client.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey)
err := mcm.aos.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey)
if err != nil {
log.Error("copyObject error", zap.String("srcObjectKey", objectkey), zap.String("dstObjectKey", dstObjectKey), zap.Error(err))
return err
Expand Down Expand Up @@ -148,12 +148,12 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string,

// Exist checks whether chunk is saved to minio storage.
func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) {
_, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
_, err := mcm.getObjectSize(ctx, bucketName, filePath)
if err != nil {
if IsErrNoSuchKey(err) {
return false, nil
}
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return false, err
}
return true, nil
Expand All @@ -163,7 +163,7 @@ func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, file
func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) {
object, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0))
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
defer object.Close()
Expand All @@ -179,7 +179,7 @@ func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, fileP
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return nil, err
}
size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
size, err := mcm.getObjectSize(ctx, bucketName, filePath)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -395,7 +395,7 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN
//}
//return resp.Body, nil

reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size)
reader, err := mcm.aos.GetObject(ctx, bucketName, objectName, offset, size)
switch err := err.(type) {
case *azcore.ResponseError:
if err.ErrorCode == string(bloberror.BlobNotFound) {
Expand All @@ -410,12 +410,12 @@ func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectN
}

func (mcm *AzureChunkManager) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {
err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize)
err := mcm.aos.PutObject(ctx, bucketName, objectName, reader, objectSize)
return err
}

func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, objectName string) (int64, error) {
info, err := mcm.client.StatObject(ctx, bucketName, objectName)
info, err := mcm.aos.StatObject(ctx, bucketName, objectName)

switch err := err.(type) {
case *azcore.ResponseError:
Expand All @@ -432,11 +432,11 @@ func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, obj
}

func (mcm *AzureChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) {
res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
res, err := mcm.aos.ListObjects(ctx, bucketName, prefix, recursive)
return res, err
}

func (mcm *AzureChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {
err := mcm.client.RemoveObject(ctx, bucketName, objectName)
err := mcm.aos.RemoveObject(ctx, bucketName, objectName)
return err
}
98 changes: 65 additions & 33 deletions core/storage/azure_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,58 @@ import (
"github.com/zilliztech/milvus-backup/internal/util/retry"
)

type AzureObjectStorage struct {
Client *service.Client
config *config
type innerAzureClient struct {
client *service.Client

bucketName string
accessKeyID string
secretAccessKeyID string
createBucket bool
}

type AzureClient struct {
cli *azblob.Client
type AzureObjectStorage struct {
//Client *service.Client
clients map[string]*innerAzureClient
//config *config
}

func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) {
cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID)
//func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) {
// cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID)
// if err != nil {
// return nil, fmt.Errorf("storage: new azure shared key credential %w", err)
// }
// endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID)
// cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil)
// if err != nil {
// return nil, fmt.Errorf("storage: new azure aos %w", err)
// }
//
// return cli, nil
//}

func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) {
client, err := newAzureObjectClient(ctx, c.address, c.accessKeyID, c.secretAccessKeyID, c.bucketName, c.useIAM, c.createBucket)
if err != nil {
return nil, fmt.Errorf("storage: new azure shared key credential %w", err)
return nil, err
}
endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID)
cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil)
backupClient, err := newAzureObjectClient(ctx, c.address, c.backupAccessKeyID, c.backupSecretAccessKeyID, c.backupBucketName, c.useIAM, c.createBucket)
if err != nil {
return nil, fmt.Errorf("storage: new azure client %w", err)
return nil, err
}

return cli, nil
clients := map[string]*innerAzureClient{
c.bucketName: client,
c.backupBucketName: backupClient,
}
return &AzureObjectStorage{
clients: clients,
//config: c,
}, nil
}

func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) {
func newAzureObjectClient(ctx context.Context, address, accessKeyID, secretAccessKeyID, bucketName string, useIAM, createBucket bool) (*innerAzureClient, error) {
var client *service.Client
var err error
if c.useIAM {
if useIAM {
cred, credErr := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
ClientID: os.Getenv("AZURE_CLIENT_ID"),
TenantID: os.Getenv("AZURE_TENANT_ID"),
Expand All @@ -68,29 +93,26 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje
if credErr != nil {
return nil, credErr
}
client, err = service.NewClient("https://"+c.accessKeyID+".blob."+c.address+"/", cred, &service.ClientOptions{})
client, err = service.NewClient("https://"+accessKeyID+".blob."+address+"/", cred, &service.ClientOptions{})
} else {
connectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING")
if connectionString == "" {
connectionString = "DefaultEndpointsProtocol=https;AccountName=" + c.accessKeyID +
";AccountKey=" + c.secretAccessKeyID + ";EndpointSuffix=" + c.address
}
connectionString := "DefaultEndpointsProtocol=https;AccountName=" + accessKeyID +
";AccountKey=" + secretAccessKeyID + ";EndpointSuffix=" + address
client, err = service.NewClientFromConnectionString(connectionString, &service.ClientOptions{})
}
if err != nil {
return nil, err
}
if c.bucketName == "" {
if bucketName == "" {
return nil, fmt.Errorf("invalid bucket name")
}
// check valid in first query
checkBucketFn := func() error {
_, err := client.NewContainerClient(c.bucketName).GetProperties(ctx, &container.GetPropertiesOptions{})
_, err := client.NewContainerClient(bucketName).GetProperties(ctx, &container.GetPropertiesOptions{})
if err != nil {
switch err := err.(type) {
case *azcore.ResponseError:
if c.createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) {
_, createErr := client.NewContainerClient(c.bucketName).Create(ctx, &azblob.CreateContainerOptions{})
if createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) {
_, createErr := client.NewContainerClient(bucketName).Create(ctx, &azblob.CreateContainerOptions{})
if createErr != nil {
return createErr
}
Expand All @@ -104,7 +126,17 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje
if err != nil {
return nil, err
}
return &AzureObjectStorage{Client: client, config: c}, nil
return &innerAzureClient{
client: client,
bucketName: bucketName,
accessKeyID: accessKeyID,
secretAccessKeyID: secretAccessKeyID,
createBucket: createBucket,
}, nil
}

func (aos *AzureObjectStorage) getClient(ctx context.Context, bucketName string) *service.Client {
return aos.clients[bucketName].client
}

func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) {
Expand All @@ -115,7 +147,7 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object
Count: size,
}
}
object, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)
object, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)

if err != nil {
return nil, err
Expand All @@ -124,20 +156,20 @@ func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, object
}

func (aos *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {
_, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{})
_, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{})
return err
}

func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) {
info, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{})
info, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{})
if err != nil {
return 0, err
}
return *info.ContentLength, nil
}

func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) {
pager := aos.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
pager := aos.clients[bucketName].client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
// pager := aos.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Expand All @@ -158,12 +190,12 @@ func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName strin
}

func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {
_, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{})
_, err := aos.clients[bucketName].client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{})
return err
}

func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error {
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.config.accessKeyID, fromBucketName, fromPath)
_, err := aos.Client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
_, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
}
8 changes: 7 additions & 1 deletion core/storage/chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu
c.accessKeyID = params.MinioCfg.AccessKeyID
c.secretAccessKeyID = params.MinioCfg.SecretAccessKey
c.useSSL = params.MinioCfg.UseSSL
c.bucketName = params.MinioCfg.BackupBucketName
c.bucketName = params.MinioCfg.BucketName
c.rootPath = params.MinioCfg.RootPath
c.cloudProvider = params.MinioCfg.CloudProvider
c.storageEngine = params.MinioCfg.StorageType
c.useIAM = params.MinioCfg.UseIAM
c.iamEndpoint = params.MinioCfg.IAMEndpoint
c.createBucket = true

c.backupAccessKeyID = params.MinioCfg.BackupAccessKeyID
c.backupSecretAccessKeyID = params.MinioCfg.BackupSecretAccessKey
c.backupBucketName = params.MinioCfg.BackupBucketName
c.backupRootPath = params.MinioCfg.BackupRootPath

return NewAzureChunkManager(ctx, c)
}
7 changes: 6 additions & 1 deletion core/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ type config struct {
rootPath string
useIAM bool
iamEndpoint string

cloudProvider string
storageEngine string

backupAccessKeyID string
backupSecretAccessKeyID string
backupBucketName string
backupRootPath string
}

func newDefaultConfig() *config {
Expand Down
Loading

0 comments on commit cd1aeda

Please sign in to comment.