diff --git a/configs/backup.yaml b/configs/backup.yaml index ffc67ab1..91d1975a 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -20,14 +20,20 @@ milvus: password: "Milvus" # Related configuration of minio, which is responsible for data persistence for Milvus. +# Deprecated, use storage instead minio: +# Object storage config of milvus +storage: + # deprecated, use type instead + cloudProvider: "minio" # remote cloud storage provider: s3, gcp, aliyun, azure + type: minio # object storage using by milvus: (local,) minio, s3, gcp, aliyun, azure + address: localhost # Address of MinIO/S3 port: 9000 # Port of MinIO/S3 - accessKeyID: minioadmin # accessKeyID of MinIO/S3 + accessKeyID: minioadmin # accessKeyID of MinIO/S3 secretAccessKey: minioadmin # MinIO/S3 encryption string useSSL: false # Access to MinIO/S3 with SSL useIAM: false - cloudProvider: "aws" iamEndpoint: "" bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance diff --git a/core/backup_context.go b/core/backup_context.go index abef2e1b..ebe18958 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -83,18 +83,7 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s zap.String("address", minioEndPoint), zap.String("bucket", params.MinioCfg.BucketName), zap.String("backupBucket", params.MinioCfg.BackupBucketName)) - minioClient, err := storage.NewMinioChunkManager(ctx, - storage.Address(minioEndPoint), - storage.AccessKeyID(params.MinioCfg.AccessKeyID), - storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey), - storage.UseSSL(params.MinioCfg.UseSSL), - storage.BucketName(params.MinioCfg.BackupBucketName), - storage.RootPath(params.MinioCfg.RootPath), - storage.CloudProvider(params.MinioCfg.CloudProvider), - storage.UseIAM(params.MinioCfg.UseIAM), - storage.IAMEndpoint(params.MinioCfg.IAMEndpoint), - storage.CreateBucket(true), - ) + minioClient, err := storage.NewChunkManager(ctx, params) return minioClient, err } diff --git a/core/backup_context_test.go b/core/backup_context_test.go index 7a90c72c..7ec9845f 100644 --- a/core/backup_context_test.go +++ b/core/backup_context_test.go @@ -38,7 +38,7 @@ func TestListBackups(t *testing.T) { assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success) backupListsWithCollection := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{ - CollectionName: "hello_milvus", + //CollectionName: "hello_milvus", }) for _, backup := range backupListsWithCollection.GetData() { diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 33040056..4e076359 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -288,12 +288,12 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup //} fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, completeCollection.Name, field.Name) if err != nil { - if strings.HasPrefix(err.Error(), "index doesn't exist") { + if strings.Contains(err.Error(), "index not found") || + strings.HasPrefix(err.Error(), "index doesn't exist") { // todo - log.Warn("field has no index", + log.Info("field has no index", zap.String("collection_name", completeCollection.Name), - zap.String("field_name", field.Name), - zap.Error(err)) + zap.String("field_name", field.Name)) continue } else { log.Error("fail in DescribeIndex", zap.Error(err)) @@ -728,10 +728,10 @@ func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.S return nil } -func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) { +func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, numOfRows int64) (*backuppb.SegmentBackupInfo, error) { segmentBackupInfo := backuppb.SegmentBackupInfo{ SegmentId: segmentID, - CollectionId: collecitonID, + CollectionId: collectionID, PartitionId: partitionID, NumOfRows: numOfRows, } @@ -739,19 +739,20 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, var rootPath string if b.params.MinioCfg.RootPath != "" { + log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath)) rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath) } else { rootPath = "" } - insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collecitonID, partitionID, segmentID) - log.Debug("insertPath", zap.String("insertPath", insertPath)) + insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", collectionID, partitionID, segmentID) + log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath)) fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false) if err != nil { log.Error("Fail to list segment path", zap.String("insertPath", insertPath), zap.Error(err)) return &segmentBackupInfo, err } - log.Debug("fieldsLogDir", zap.Any("fieldsLogDir", fieldsLogDir)) + log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir)) insertLogs := make([]*backuppb.FieldBinlog, 0) for _, fieldLogDir := range fieldsLogDir { binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false) @@ -771,7 +772,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, }) } - deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collecitonID, partitionID, segmentID) + deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", collectionID, partitionID, segmentID) deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false) deltaLogs := make([]*backuppb.FieldBinlog, 0) for _, deltaFieldLogDir := range deltaFieldsLogDir { @@ -797,7 +798,7 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collecitonID int64, }) } - //statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collecitonID, partitionID, segmentID) + //statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collectionID, partitionID, segmentID) //statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false) //statsLogs := make([]*backuppb.FieldBinlog, 0) //for _, statsFieldLogDir := range statsFieldsLogDir { diff --git a/core/paramtable/base_table.go b/core/paramtable/base_table.go index 27aad793..340489ff 100644 --- a/core/paramtable/base_table.go +++ b/core/paramtable/base_table.go @@ -53,6 +53,8 @@ const ( DefaultMinioBackupBucketName = "a-bucket" DefaultMinioBackupRootPath = "backup" + DefaultStorageType = "minio" + DefaultMilvusAddress = "localhost" DefaultMilvusPort = "19530" DefaultMilvusAuthorizationEnabled = "false" diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 4cd6f580..464ab3cf 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -114,15 +114,19 @@ func (p *MilvusConfig) initTLSMode() { // ///////////////////////////////////////////////////////////////////////////// // --- minio --- const ( + Minio = "minio" CloudProviderAWS = "aws" CloudProviderGCP = "gcp" CloudProviderAliyun = "ali" + CloudProviderAzure = "azure" ) var supportedCloudProvider = map[string]bool{ + Minio: true, CloudProviderAWS: true, CloudProviderGCP: true, CloudProviderAliyun: true, + CloudProviderAzure: true, } type MinioConfig struct { @@ -141,11 +145,14 @@ type MinioConfig struct { BackupBucketName string BackupRootPath string + + StorageType string } func (p *MinioConfig) init(base *BaseTable) { p.Base = base + p.initStorageType() p.initAddress() p.initPort() p.initAccessKeyID() @@ -154,7 +161,7 @@ func (p *MinioConfig) init(base *BaseTable) { p.initBucketName() p.initRootPath() p.initUseIAM() - p.initCloudProvider() + //p.initCloudProvider() p.initIAMEndpoint() p.initBackupBucketName() @@ -162,45 +169,50 @@ func (p *MinioConfig) init(base *BaseTable) { } func (p *MinioConfig) initAddress() { - endpoint := p.Base.LoadWithDefault("minio.address", DefaultMinioAddress) + endpoint := p.Base.LoadWithDefault("storage.address", + p.Base.LoadWithDefault("minio.address", DefaultMinioAddress)) p.Address = endpoint } func (p *MinioConfig) initPort() { - port := p.Base.LoadWithDefault("minio.port", DefaultMinioPort) + port := p.Base.LoadWithDefault("storage.port", + p.Base.LoadWithDefault("minio.port", DefaultMinioPort)) p.Port = port } func (p *MinioConfig) initAccessKeyID() { - keyID, err := p.Base.Load("minio.accessKeyID") - if err != nil { - panic(err) - } + keyID := p.Base.LoadWithDefault("storage.accessKeyID", + p.Base.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey)) p.AccessKeyID = keyID } func (p *MinioConfig) initSecretAccessKey() { - key := p.Base.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey) + key := p.Base.LoadWithDefault("storage.secretAccessKey", + p.Base.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey)) p.SecretAccessKey = key } func (p *MinioConfig) initUseSSL() { - usessl := p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) + usessl := p.Base.LoadWithDefault("storage.useSSL", + p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL)) p.UseSSL, _ = strconv.ParseBool(usessl) } func (p *MinioConfig) initBucketName() { - bucketName := p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBucketName) + bucketName := p.Base.LoadWithDefault("storage.bucketName", + p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBucketName)) p.BucketName = bucketName } func (p *MinioConfig) initRootPath() { - rootPath := p.Base.LoadWithDefault("minio.rootPath", DefaultMinioRootPath) + rootPath := p.Base.LoadWithDefault("storage.rootPath", + p.Base.LoadWithDefault("minio.rootPath", DefaultMinioRootPath)) p.RootPath = rootPath } func (p *MinioConfig) initUseIAM() { - useIAM := p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM) + useIAM := p.Base.LoadWithDefault("storage.useIAM", + p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM)) var err error p.UseIAM, err = strconv.ParseBool(useIAM) if err != nil { @@ -208,28 +220,40 @@ func (p *MinioConfig) initUseIAM() { } } -func (p *MinioConfig) initCloudProvider() { - p.CloudProvider = p.Base.LoadWithDefault("minio.cloudProvider", DefaultMinioCloudProvider) - if !supportedCloudProvider[p.CloudProvider] { - panic("unsupported cloudProvider:" + p.CloudProvider) - } -} +//func (p *MinioConfig) initCloudProvider() { +// p.CloudProvider = p.Base.LoadWithDefault("minio.cloudProvider", DefaultMinioCloudProvider) +// if !supportedCloudProvider[p.CloudProvider] { +// panic("unsupported cloudProvider:" + p.CloudProvider) +// } +//} func (p *MinioConfig) initIAMEndpoint() { - iamEndpoint := p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) + iamEndpoint := p.Base.LoadWithDefault("storage.iamEndpoint", + p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint)) p.IAMEndpoint = iamEndpoint } func (p *MinioConfig) initBackupBucketName() { - bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName) + bucketName := p.Base.LoadWithDefault("storage.backupBucketName", + p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName)) p.BackupBucketName = bucketName } func (p *MinioConfig) initBackupRootPath() { - rootPath := p.Base.LoadWithDefault("minio.backupRootPath", DefaultMinioBackupRootPath) + rootPath := p.Base.LoadWithDefault("storage.backupRootPath", + p.Base.LoadWithDefault("minio.backupRootPath", DefaultMinioBackupRootPath)) p.BackupRootPath = rootPath } +func (p *MinioConfig) initStorageType() { + engine := p.Base.LoadWithDefault("storage.type", + p.Base.LoadWithDefault("minio.type", DefaultStorageType)) + if !supportedCloudProvider[engine] { + panic("unsupported storage type:" + engine) + } + p.StorageType = engine +} + type HTTPConfig struct { Base *BaseTable diff --git a/core/paramtable/params_test.go b/core/paramtable/params_test.go new file mode 100644 index 00000000..88d3dff3 --- /dev/null +++ b/core/paramtable/params_test.go @@ -0,0 +1 @@ +package paramtable diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go new file mode 100644 index 00000000..9d774cf9 --- /dev/null +++ b/core/storage/azure_chunk_manager.go @@ -0,0 +1,442 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/cockroachdb/errors" + "github.com/minio/minio-go/v7" + "go.uber.org/zap" + "golang.org/x/exp/mmap" + "golang.org/x/sync/errgroup" + + "github.com/zilliztech/milvus-backup/internal/log" + "github.com/zilliztech/milvus-backup/internal/util/errorutil" +) + +// AzureChunkManager is responsible for read and write data stored in minio. +type AzureChunkManager struct { + client *AzureObjectStorage + + //cli *azblob.Client + // ctx context.Context + bucketName string + rootPath string +} + +var _ ChunkManager = (*AzureChunkManager)(nil) + +func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) { + client, err := newAzureObjectStorageWithConfig(ctx, c) + if err != nil { + return nil, err + } + + //cli, err := NewAzureClient(ctx, c) + //if err != nil { + // return nil, err + //} + mcm := &AzureChunkManager{ + client: client, + //cli: cli, + bucketName: c.bucketName, + rootPath: strings.TrimLeft(c.rootPath, "/"), + } + log.Info("Azure chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath())) + return mcm, nil +} + +// RootPath returns minio root path. +func (mcm *AzureChunkManager) RootPath() string { + return mcm.rootPath +} + +func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error { + objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true) + if err != nil { + log.Warn("listWithPrefix error", zap.String("prefix", fromPath), zap.Error(err)) + return err + } + for _, objectkey := range objectkeys { + dstObjectKey := strings.Replace(objectkey, fromPath, toPath, 1) + err := mcm.client.CopyObject(ctx, fromBucketName, toBucketName, objectkey, dstObjectKey) + if err != nil { + log.Error("copyObject error", zap.String("srcObjectKey", objectkey), zap.String("dstObjectKey", dstObjectKey), zap.Error(err)) + return err + } + } + return nil +} + +// Path returns the path of minio data if exists. +func (mcm *AzureChunkManager) Path(ctx context.Context, bucketName string, filePath string) (string, error) { + exist, err := mcm.Exist(ctx, bucketName, filePath) + if err != nil { + return "", err + } + if !exist { + return "", errors.New("minio file manage cannot be found with filePath:" + filePath) + } + return filePath, nil +} + +// Reader returns the path of minio data if exists. +func (mcm *AzureChunkManager) Reader(ctx context.Context, bucketName string, filePath string) (FileReader, error) { + reader, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0)) + if err != nil { + log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return nil, err + } + return reader, nil +} + +func (mcm *AzureChunkManager) Size(ctx context.Context, bucketName string, filePath string) (int64, error) { + objectInfo, err := mcm.getObjectSize(ctx, bucketName, filePath) + if err != nil { + log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return 0, err + } + + return objectInfo, nil +} + +// +// Write writes the data to minio storage. +func (mcm *AzureChunkManager) Write(ctx context.Context, bucketName string, filePath string, content []byte) error { + err := mcm.putObject(ctx, bucketName, filePath, bytes.NewReader(content), int64(len(content))) + if err != nil { + log.Warn("failed to put object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + + return nil +} + +// MultiWrite saves multiple objects, the path is the key of @kvs. +// The object value is the value of @kvs. +func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string, kvs map[string][]byte) error { + var el error + for key, value := range kvs { + err := mcm.Write(ctx, bucketName, key, value) + if err != nil { + el = errors.New(fmt.Sprintf("failed to write %s", key)) + } + } + return el +} + +// Exist checks whether chunk is saved to minio storage. +func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { + _, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + if err != nil { + if IsErrNoSuchKey(err) { + return false, nil + } + log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return false, err + } + return true, nil +} + +// Read reads the minio storage data if exists. +func (mcm *AzureChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) { + object, err := mcm.getObject(ctx, bucketName, filePath, int64(0), int64(0)) + if err != nil { + log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return nil, err + } + defer object.Close() + + // Prefetch object data + var empty []byte + _, err = object.Read(empty) + if err != nil { + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == "NoSuchKey" { + return nil, WrapErrNoSuchKey(filePath) + } + log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) + return nil, err + } + size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + if err != nil { + log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return nil, err + } + data, err := Read(object, size) + if err != nil { + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == "NoSuchKey" { + return nil, WrapErrNoSuchKey(filePath) + } + log.Warn("failed to read object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return nil, err + } + return data, nil +} + +func (mcm *AzureChunkManager) MultiRead(ctx context.Context, bucketName string, keys []string) ([][]byte, error) { + var el error + var objectsValues [][]byte + for _, key := range keys { + objectValue, err := mcm.Read(ctx, bucketName, key) + if err != nil { + el = errors.New(fmt.Sprintf("failed to read %s %s", bucketName, key)) + } + objectsValues = append(objectsValues, objectValue) + } + + return objectsValues, el +} + +func (mcm *AzureChunkManager) ReadWithPrefix(ctx context.Context, bucketName string, prefix string) ([]string, [][]byte, error) { + objectsKeys, _, err := mcm.ListWithPrefix(ctx, bucketName, prefix, true) + if err != nil { + return nil, nil, err + } + objectsValues, err := mcm.MultiRead(ctx, bucketName, objectsKeys) + if err != nil { + return nil, nil, err + } + + return objectsKeys, objectsValues, nil +} + +func (mcm *AzureChunkManager) Mmap(ctx context.Context, bucketName string, filePath string) (*mmap.ReaderAt, error) { + return nil, errors.New("this method has not been implemented") +} + +// ReadAt reads specific position data of minio storage if exists. +func (mcm *AzureChunkManager) ReadAt(ctx context.Context, bucketName string, filePath string, off int64, length int64) ([]byte, error) { + return nil, errors.New("this method has not been implemented") + //if off < 0 || length < 0 { + // return nil, io.EOF + //} + // + //object, err := mcm.getObject(ctx, bucketName, filePath, off, length) + //if err != nil { + // log.Warn("failed to get object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + // return nil, err + //} + //defer object.Close() + // + //data, err := Read(object, length) + //if err != nil { + // errResponse := minio.ToErrorResponse(err) + // if errResponse.Code == "NoSuchKey" { + // return nil, WrapErrNoSuchKey(filePath) + // } + // log.Warn("failed to read object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + // return nil, err + //} + //return data, nil +} + +// Remove deletes an object with @key. +func (mcm *AzureChunkManager) Remove(ctx context.Context, bucketName string, filePath string) error { + err := mcm.removeObject(ctx, bucketName, filePath) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + return nil +} + +// MultiRemove deletes a objects with @keys. +func (mcm *AzureChunkManager) MultiRemove(ctx context.Context, bucketName string, keys []string) error { + var el errorutil.ErrorList + for _, key := range keys { + err := mcm.Remove(ctx, bucketName, key) + if err != nil { + el = append(el, err) + } + } + if len(el) == 0 { + return nil + } + return el +} + +// RemoveWithPrefix removes all objects with the same prefix @prefix from minio. +func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName string, prefix string) error { + objects, err := mcm.listObjects(ctx, bucketName, prefix, true) + if err != nil { + return err + } + removeKeys := make([]string, 0) + for key := range objects { + removeKeys = append(removeKeys, key) + } + i := 0 + maxGoroutine := 10 + for i < len(removeKeys) { + runningGroup, groupCtx := errgroup.WithContext(ctx) + for j := 0; j < maxGoroutine && i < len(removeKeys); j++ { + key := removeKeys[i] + runningGroup.Go(func() error { + err := mcm.removeObject(groupCtx, bucketName, key) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", key), zap.Error(err)) + return err + } + return nil + }) + i++ + } + if err := runningGroup.Wait(); err != nil { + return err + } + } + return nil +} + +// ListWithPrefix returns objects with provided prefix. +func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) { + objects, err := mcm.listObjects(ctx, bucketName, prefix, false) + if err != nil { + return nil, nil, err + } + if recursive { + var objectsKeys []string + var sizes []int64 + for object, contentLength := range objects { + objectsKeys = append(objectsKeys, object) + sizes = append(sizes, contentLength) + } + return objectsKeys, sizes, nil + } else { + var objectsKeys []string + var sizes []int64 + objectsKeysDict := make(map[string]bool, 0) + for object, _ := range objects { + keyWithoutPrefix := strings.Replace(object, prefix, "", 1) + if strings.Contains(keyWithoutPrefix, "/") { + var key string + if strings.HasPrefix(keyWithoutPrefix, "/") { + key = prefix + "/" + strings.Split(keyWithoutPrefix, "/")[1] + "/" + } else { + key = prefix + strings.Split(keyWithoutPrefix, "/")[0] + "/" + } + if _, exist := objectsKeysDict[key]; !exist { + objectsKeys = append(objectsKeys, key) + sizes = append(sizes, 0) + objectsKeysDict[key] = true + } + } else { + key := prefix + keyWithoutPrefix + if _, exist := objectsKeysDict[key]; !exist { + objectsKeys = append(objectsKeys, key) + sizes = append(sizes, 0) + objectsKeysDict[key] = true + } + } + } + return objectsKeys, sizes, nil + } + + //var objectsKeys []string + //var sizes []int64 + //tasks := list.New() + //tasks.PushBack(prefix) + //for tasks.Len() > 0 { + // e := tasks.Front() + // pre := e.Value.(string) + // tasks.Remove(e) + // + // // TODO add concurrent call if performance matters + // // only return current level per call + // objects, err := mcm.listObjects(ctx, bucketName, pre, false) + // if err != nil { + // return nil, nil, err + // } + // + // for object, contentLength := range objects { + // // with tailing "/", object is a "directory" + // if strings.HasSuffix(object, "/") && recursive { + // // enqueue when recursive is true + // if object != pre { + // tasks.PushBack(object) + // } + // continue + // } + // objectsKeys = append(objectsKeys, object) + // sizes = append(sizes, contentLength) + // } + //} + // + //return objectsKeys, sizes, nil +} + +func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { + //resp, err := mcm.cli.DownloadStream(ctx, bucketName, objectName, nil) + //if err != nil { + // return nil, fmt.Errorf("storage: azure download stream %w", err) + //} + //return resp.Body, nil + + reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size) + switch err := err.(type) { + case *azcore.ResponseError: + if err.ErrorCode == string(bloberror.BlobNotFound) { + return nil, WrapErrNoSuchKey(objectName) + } + //case minio.ErrorResponse: + // if err.Code == "NoSuchKey" { + // return nil, WrapErrNoSuchKey(objectName) + // } + } + return reader, err +} + +func (mcm *AzureChunkManager) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { + err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize) + return err +} + +func (mcm *AzureChunkManager) getObjectSize(ctx context.Context, bucketName, objectName string) (int64, error) { + info, err := mcm.client.StatObject(ctx, bucketName, objectName) + + switch err := err.(type) { + case *azcore.ResponseError: + if err.ErrorCode == string(bloberror.BlobNotFound) { + return info, WrapErrNoSuchKey(objectName) + } + //case minio.ErrorResponse: + // if err.Code == "NoSuchKey" { + // return nil, WrapErrNoSuchKey(objectName) + // } + } + + return info, err +} + +func (mcm *AzureChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { + res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive) + return res, err +} + +func (mcm *AzureChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error { + err := mcm.client.RemoveObject(ctx, bucketName, objectName) + return err +} diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go new file mode 100644 index 00000000..6bed8124 --- /dev/null +++ b/core/storage/azure_object_storage.go @@ -0,0 +1,169 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" + + "github.com/zilliztech/milvus-backup/internal/util/retry" +) + +type AzureObjectStorage struct { + Client *service.Client + config *config +} + +type AzureClient struct { + cli *azblob.Client +} + +func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { + cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) + if err != nil { + return nil, fmt.Errorf("storage: new azure shared key credential %w", err) + } + endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) + cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) + if err != nil { + return nil, fmt.Errorf("storage: new azure client %w", err) + } + + return cli, nil +} + +func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { + var client *service.Client + var err error + if c.useIAM { + cred, credErr := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{ + ClientID: os.Getenv("AZURE_CLIENT_ID"), + TenantID: os.Getenv("AZURE_TENANT_ID"), + TokenFilePath: os.Getenv("AZURE_FEDERATED_TOKEN_FILE"), + }) + if credErr != nil { + return nil, credErr + } + client, err = service.NewClient("https://"+c.accessKeyID+".blob."+c.address+"/", cred, &service.ClientOptions{}) + } else { + connectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING") + if connectionString == "" { + connectionString = "DefaultEndpointsProtocol=https;AccountName=" + c.accessKeyID + + ";AccountKey=" + c.secretAccessKeyID + ";EndpointSuffix=" + c.address + } + client, err = service.NewClientFromConnectionString(connectionString, &service.ClientOptions{}) + } + if err != nil { + return nil, err + } + if c.bucketName == "" { + return nil, fmt.Errorf("invalid bucket name") + } + // check valid in first query + checkBucketFn := func() error { + _, err := client.NewContainerClient(c.bucketName).GetProperties(ctx, &container.GetPropertiesOptions{}) + if err != nil { + switch err := err.(type) { + case *azcore.ResponseError: + if c.createBucket && err.ErrorCode == string(bloberror.ContainerNotFound) { + _, createErr := client.NewContainerClient(c.bucketName).Create(ctx, &azblob.CreateContainerOptions{}) + if createErr != nil { + return createErr + } + return nil + } + } + } + return err + } + err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts)) + if err != nil { + return nil, err + } + return &AzureObjectStorage{Client: client, config: c}, nil +} + +func (aos *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { + opts := azblob.DownloadStreamOptions{} + if offset > 0 { + opts.Range = azblob.HTTPRange{ + Offset: offset, + Count: size, + } + } + object, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) + + if err != nil { + return nil, err + } + return object.Body, nil +} + +func (aos *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { + _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{}) + return err +} + +func (aos *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) { + info, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{}) + if err != nil { + return 0, err + } + return *info.ContentLength, nil +} + +func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]int64, error) { + pager := aos.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ + Prefix: &prefix, + }) + // pager := aos.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ + // Prefix: &prefix, + // }) + + objects := map[string]int64{} + if pager.More() { + pageResp, err := pager.NextPage(context.Background()) + if err != nil { + return nil, err + } + for _, blob := range pageResp.Segment.BlobItems { + objects[*blob.Name] = *blob.Properties.ContentLength + } + } + return objects, nil +} + +func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error { + _, err := aos.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{}) + return err +} + +func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error { + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.config.accessKeyID, fromBucketName, fromPath) + _, err := aos.Client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + return err +} diff --git a/core/storage/chunk_manager.go b/core/storage/chunk_manager.go new file mode 100644 index 00000000..79a258d2 --- /dev/null +++ b/core/storage/chunk_manager.go @@ -0,0 +1,60 @@ +package storage + +import ( + "context" + + "github.com/zilliztech/milvus-backup/core/paramtable" +) + +func NewChunkManager(ctx context.Context, params paramtable.BackupParams) (ChunkManager, error) { + engine := params.MinioCfg.StorageType + if engine == "azure" { + return newAzureChunkManagerWithParams(ctx, params) + } else { + return newMinioChunkManagerWithParams(ctx, params) + } + //switch engine { + //case "local": + // return newMinioChunkManagerWithParams(ctx, params) + // //return NewLocalChunkManager(RootPath(f.config.rootPath)), nil + //case "minio": + //case "s3": + //case "gcp": + //case "aliyun": + // return newMinioChunkManagerWithParams(ctx, params) + //case "azure": + // return newAzureChunkManagerWithParams(ctx, params) + //default: + // return nil, errors.New("no chunk manager implemented with engine: " + engine) + //} +} + +func newMinioChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*MinioChunkManager, error) { + c := newDefaultConfig() + c.address = params.MinioCfg.Address + ":" + params.MinioCfg.Port + c.accessKeyID = params.MinioCfg.AccessKeyID + c.secretAccessKeyID = params.MinioCfg.SecretAccessKey + c.useSSL = params.MinioCfg.UseSSL + c.bucketName = params.MinioCfg.BackupBucketName + c.rootPath = params.MinioCfg.RootPath + c.storageEngine = params.MinioCfg.StorageType + c.useIAM = params.MinioCfg.UseIAM + c.iamEndpoint = params.MinioCfg.IAMEndpoint + c.createBucket = true + return newMinioChunkManagerWithConfig(ctx, c) +} + +func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*AzureChunkManager, error) { + c := newDefaultConfig() + c.address = params.MinioCfg.Address + ":" + params.MinioCfg.Port + c.accessKeyID = params.MinioCfg.AccessKeyID + c.secretAccessKeyID = params.MinioCfg.SecretAccessKey + c.useSSL = params.MinioCfg.UseSSL + c.bucketName = params.MinioCfg.BackupBucketName + c.rootPath = params.MinioCfg.RootPath + c.storageEngine = params.MinioCfg.StorageType + c.useIAM = params.MinioCfg.UseIAM + c.iamEndpoint = params.MinioCfg.IAMEndpoint + c.createBucket = true + return NewAzureChunkManager(ctx, c) +} diff --git a/core/storage/minio_chunk_manager.go b/core/storage/minio_chunk_manager.go index bdd8ae8b..1015c6f5 100644 --- a/core/storage/minio_chunk_manager.go +++ b/core/storage/minio_chunk_manager.go @@ -21,6 +21,8 @@ import ( "golang.org/x/exp/mmap" ) +const NoSuchKey = "NoSuchKey" + var ( ErrNoSuchKey = errors.New("NoSuchKey") ) @@ -29,6 +31,10 @@ func WrapErrNoSuchKey(key string) error { return fmt.Errorf("%w(key=%s)", ErrNoSuchKey, key) } +func IsErrNoSuchKey(err error) bool { + return strings.HasPrefix(err.Error(), NoSuchKey) +} + var CheckBucketRetryAttempts uint = 20 // MinioChunkManager is responsible for read and write data stored in minio. @@ -58,7 +64,7 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk var newMinioFn = minio.New var bucketLookupType = minio.BucketLookupAuto - switch c.cloudProvider { + switch c.storageEngine { case paramtable.CloudProviderAliyun: // auto doesn't work for aliyun, so we set to dns deliberately bucketLookupType = minio.BucketLookupDNS diff --git a/core/storage/options.go b/core/storage/options.go index 8852dd2a..aee4a9d9 100644 --- a/core/storage/options.go +++ b/core/storage/options.go @@ -10,8 +10,10 @@ type config struct { createBucket bool rootPath string useIAM bool - cloudProvider string iamEndpoint string + // Deprecated, use storageEngine instead + cloudProvider string + storageEngine string } func newDefaultConfig() *config { @@ -68,12 +70,6 @@ func UseIAM(useIAM bool) Option { } } -func CloudProvider(cloudProvider string) Option { - return func(c *config) { - c.cloudProvider = cloudProvider - } -} - func IAMEndpoint(iamEndpoint string) Option { return func(c *config) { c.iamEndpoint = iamEndpoint diff --git a/core/storage/types.go b/core/storage/types.go index 6255e128..bfc62191 100644 --- a/core/storage/types.go +++ b/core/storage/types.go @@ -36,6 +36,7 @@ type ChunkManager interface { ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) // ReadWithPrefix reads files with same @prefix and returns contents. ReadWithPrefix(ctx context.Context, bucketName string, prefix string) ([]string, [][]byte, error) + // Not use Mmap(ctx context.Context, bucketName string, filePath string) (*mmap.ReaderAt, error) // ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read. // if all bytes are read, @err is io.EOF. diff --git a/example/prepare_data.py b/example/prepare_data.py index ccd7eb1e..531e0700 100644 --- a/example/prepare_data.py +++ b/example/prepare_data.py @@ -111,11 +111,9 @@ insert_result2 = hello_milvus2.insert(entities2) hello_milvus2.flush() -index_params = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} -hello_milvus.create_index("embeddings", index_params) - - -hello_milvus2.create_index(field_name="var",index_name="scalar_index") +# index_params = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} +# hello_milvus.create_index("embeddings", index_params) +# hello_milvus2.create_index(field_name="var",index_name="scalar_index") # index_params2 = {"index_type": "Trie"} # hello_milvus2.create_index("var", index_params2) diff --git a/go.mod b/go.mod index d7385baf..e3d48fad 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,16 @@ module github.com/zilliztech/milvus-backup go 1.18 require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 github.com/aliyun/credentials-go v1.3.0 github.com/blang/semver/v4 v4.0.0 github.com/cockroachdb/errors v1.9.1 github.com/gin-gonic/gin v1.8.1 github.com/golang/protobuf v1.5.2 github.com/google/btree v1.0.1 - github.com/google/uuid v1.1.2 + github.com/google/uuid v1.3.0 github.com/json-iterator/go v1.1.12 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/milvus-io/milvus-sdk-go/v2 v2.3.0 @@ -37,6 +40,8 @@ require ( require ( cloud.google.com/go v0.81.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/alibabacloud-go/debug v0.0.0-20190504072949-9472017b5c68 // indirect github.com/alibabacloud-go/tea v1.1.8 // indirect @@ -58,6 +63,7 @@ require ( github.com/go-playground/validator/v10 v10.10.0 // indirect github.com/goccy/go-json v0.9.7 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -66,6 +72,7 @@ require ( github.com/klauspost/cpuid v1.3.1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -79,6 +86,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect + github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/xid v1.2.1 // indirect @@ -94,10 +102,10 @@ require ( go.etcd.io/etcd/api/v3 v3.5.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/crypto v0.11.0 // indirect - golang.org/x/net v0.12.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/crypto v0.12.0 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.11.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect diff --git a/go.sum b/go.sum index 6ea338b8..36329ecb 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,22 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 h1:8q4SaHjFsClSvuVne0ID/5Ka8u3fcIHyqkLjcFpNRHQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0/go.mod h1:bjGvMhVMb+EEm3VRNQawDMUyMMjo+S5ewNjflkep/0Q= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.1 h1:/iHxaJhsFr0+xVFfbMr5vxz848jyiWuIEDhYq3y5odY= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.1/go.mod h1:bjGvMhVMb+EEm3VRNQawDMUyMMjo+S5ewNjflkep/0Q= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1 h1:T8quHYlUGyb/oqtSTwqlCr1ilJHrDv+ZtpSfo+hm1BU= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1/go.mod h1:gLa1CL2RNE4s7M3yopJ/p0iq5DdY6Yv5ZUt9MTRZOQM= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0/go.mod h1:7QJP7dr2wznCMeqIrhMgWGf7XpAQnVrJqDm9nvV3Cu4= +github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw72xHJc34BNNykqSOeEJDAWkhf0u12/Jk= +github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -113,6 +129,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= @@ -196,7 +213,11 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= +github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= +github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -263,8 +284,9 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -353,6 +375,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= @@ -434,6 +458,8 @@ github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -611,8 +637,8 @@ golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= -golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -700,8 +726,8 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -788,6 +814,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -799,8 +826,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -817,8 +844,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=