From 3b4ca92ec5ad0255655fe75301177dab4bd274a4 Mon Sep 17 00:00:00 2001 From: minguyen Date: Tue, 22 Aug 2023 04:08:10 -0500 Subject: [PATCH 1/6] add connection to gcs and use different context for upload incase it got cancel by another thread --- .gitignore | 3 +- go.mod | 1 + go.sum | 2 + pkg/backup/upload.go | 13 ++--- pkg/config/config.go | 7 ++- pkg/storage/gcs.go | 123 +++++++++++++++++++++++++++++++++++++------ 6 files changed, 126 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 646104a8..5d635a69 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ build/ _instances/ _coverage_/ __pycache__/ -*.py[cod] \ No newline at end of file +*.py[cod] +vendor/ \ No newline at end of file diff --git a/go.mod b/go.mod index 2c2c7c93..6b9e810f 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/jlaffaye/ftp v0.2.0 + github.com/jolestar/go-commons-pool v2.0.0+incompatible github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/klauspost/compress v1.16.6 diff --git a/go.sum b/go.sum index 6850d4ae..b4416202 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE= +github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8= github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM= github.com/jolestar/go-commons-pool/v2 v2.1.2/go.mod h1:r4NYccrkS5UqP1YQI1COyTZ9UjPJAAGTUxzcsK1kqhY= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index 9901b237..58094d25 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -5,11 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/clickhouse" - "github.com/Altinity/clickhouse-backup/pkg/custom" - "github.com/Altinity/clickhouse-backup/pkg/resumable" - "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/eapache/go-resiliency/retrier" "io" "os" "path" @@ -20,6 +15,12 @@ import ( "sync/atomic" "time" + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/custom" + "github.com/Altinity/clickhouse-backup/pkg/resumable" + "github.com/Altinity/clickhouse-backup/pkg/status" + "github.com/eapache/go-resiliency/retrier" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -130,7 +131,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str log.Debugf("prepare table concurrent semaphore with concurrency=%d len(tablesForUpload)=%d", b.cfg.General.UploadConcurrency, len(tablesForUpload)) uploadSemaphore := semaphore.NewWeighted(int64(b.cfg.General.UploadConcurrency)) - uploadGroup, uploadCtx := errgroup.WithContext(ctx) + uploadGroup, uploadCtx := errgroup.WithContext(context.Background()) for i, table := range tablesForUpload { if err := uploadSemaphore.Acquire(uploadCtx, 1); err != nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index ae600df5..9e9593df 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,13 +3,14 @@ package config import ( "crypto/tls" "fmt" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "math" "os" "runtime" "strings" "time" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/apex/log" "github.com/kelseyhightower/envconfig" "github.com/urfave/cli" @@ -76,6 +77,9 @@ type GCSConfig struct { StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"` ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"` CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"` + // NOTE: ClientPoolSize should be atleast 2 times bigger than + // UploadConcurrency or DownloadConcurrency in each upload and download case + ClientPoolSize int `yaml:"client_pool_size" envconfig:"CLIENT_POOL_SIZE"` } // AzureBlobConfig - Azure Blob settings section @@ -544,6 +548,7 @@ func DefaultConfig() *Config { CompressionLevel: 1, CompressionFormat: "tar", StorageClass: "STANDARD", + ClientPoolSize: 500, }, COS: COSConfig{ RowURL: "", diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 8801742c..79b76866 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -5,14 +5,16 @@ import ( "encoding/base64" "errors" "fmt" - "google.golang.org/api/iterator" "io" "net/http" "path" "strings" "time" + "google.golang.org/api/iterator" + "github.com/Altinity/clickhouse-backup/pkg/config" + pool "github.com/jolestar/go-commons-pool" "google.golang.org/api/option/internaloption" "cloud.google.com/go/storage" @@ -23,14 +25,19 @@ import ( // GCS - presents methods for manipulate data on GCS type GCS struct { - client *storage.Client - Config *config.GCSConfig + client *storage.Client + Config *config.GCSConfig + clientPool *pool.ObjectPool } type debugGCSTransport struct { base http.RoundTripper } +type clientObject struct { + Client *storage.Client +} + func (w debugGCSTransport) RoundTrip(r *http.Request) (*http.Response, error) { logMsg := fmt.Sprintf(">>> [GCS_REQUEST] >>> %v %v\n", r.Method, r.URL.String()) for h, values := range r.Header { @@ -96,6 +103,30 @@ func (gcs *GCS) Connect(ctx context.Context) error { clientOptions = append(clientOptions, option.WithHTTPClient(debugClient)) } + factory := pool.NewPooledObjectFactory( + func(context.Context) (interface{}, error) { + sClient, err := storage.NewClient(ctx, clientOptions...) + if err != nil { + return nil, err + } + return &clientObject{ + Client: sClient, + }, + nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // destroy + return object.Object.(*clientObject).Client.Close() + }, func(ctx context.Context, object *pool.PooledObject) bool { + return true + }, func(ctx context.Context, object *pool.PooledObject) error { + // activate do nothing + return nil + }, func(ctx context.Context, object *pool.PooledObject) error { + // passivate do nothing + return nil + }) + gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory) + gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize gcs.client, err = storage.NewClient(ctx, clientOptions...) return err } @@ -105,6 +136,13 @@ func (gcs *GCS) Close(ctx context.Context) error { } func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + rootPath := path.Join(gcs.Config.Path, gcsPath) prefix := rootPath + "/" if rootPath == "/" { @@ -114,22 +152,25 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces if !recursive { delimiter = "/" } - it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ + it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{ Prefix: prefix, Delimiter: delimiter, }) for { object, err := it.Next() if errors.Is(err, iterator.Done) { + gcs.clientPool.ReturnObject(ctx, pClientObj) return nil } if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } if object.Prefix != "" { if err := process(ctx, &gcsFile{ name: strings.TrimPrefix(object.Prefix, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } continue @@ -139,17 +180,26 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces lastModified: object.Updated, name: strings.TrimPrefix(object.Name, rootPath), }); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return err } } } func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) { - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)) reader, err := obj.NewReader(ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return reader, nil } @@ -158,9 +208,17 @@ func (gcs *GCS) GetFileReaderWithLocalPath(ctx context.Context, key, _ string) ( } func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client key = path.Join(gcs.Config.Path, key) - obj := gcs.client.Bucket(gcs.Config.Bucket).Object(key) - writer := obj.NewWriter(ctx) + obj := pClient.Bucket(gcs.Config.Bucket).Object(key) + + writer_ctx := context.Background() + writer := obj.NewWriter(writer_ctx) writer.StorageClass = gcs.Config.StorageClass if len(gcs.Config.ObjectLabels) > 0 { writer.Metadata = gcs.Config.ObjectLabels @@ -168,21 +226,33 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error defer func() { if err := writer.Close(); err != nil { log.Warnf("can't close writer: %+v", err) + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return } + gcs.clientPool.ReturnObject(ctx, pClientObj) }() buffer := make([]byte, 512*1024) - _, err := io.CopyBuffer(writer, r, buffer) + _, err = io.CopyBuffer(writer, r, buffer) return err } func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { - objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return nil, err + } + pClient := pClientObj.(*clientObject).Client + gcs_ctx := context.Background() + objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(gcs_ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return nil, ErrNotFound } + gcs.clientPool.InvalidateObject(ctx, pClientObj) return nil, err } + gcs.clientPool.ReturnObject(ctx, pClientObj) return &gcsFile{ size: objAttr.Size, lastModified: objAttr.Updated, @@ -191,8 +261,21 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { } func (gcs *GCS) deleteKey(ctx context.Context, key string) error { - object := gcs.client.Bucket(gcs.Config.Bucket).Object(key) - return object.Delete(ctx) + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return err + } + pClient := pClientObj.(*clientObject).Client + object := pClient.Bucket(gcs.Config.Bucket).Object(key) + gcs_ctx := context.Background() + err = object.Delete(gcs_ctx) + if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) + return err + } + gcs.clientPool.ReturnObject(ctx, pClientObj) + return nil } func (gcs *GCS) DeleteFile(ctx context.Context, key string) error { @@ -206,17 +289,27 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string) } func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { + pClientObj, err := gcs.clientPool.BorrowObject(ctx) + if err != nil { + log.Errorf("can't get client connection from pool: %+v", err) + return 0, err + } + pClient := pClientObj.(*clientObject).Client dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey) - src := gcs.client.Bucket(srcBucket).Object(srcKey) - dst := gcs.client.Bucket(gcs.Config.Bucket).Object(dstKey) - attrs, err := src.Attrs(ctx) + src := pClient.Bucket(srcBucket).Object(srcKey) + dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey) + gcs_ctx := context.Background() + attrs, err := src.Attrs(gcs_ctx) if err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } - if _, err = dst.CopierFrom(src).Run(ctx); err != nil { + if _, err = dst.CopierFrom(src).Run(gcs_ctx); err != nil { + gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey) + gcs.clientPool.ReturnObject(ctx, pClientObj) return attrs.Size, nil } From 85bada974ca75831196615bdae7ece656ced6725 Mon Sep 17 00:00:00 2001 From: Minh Duc Nguyen Date: Wed, 23 Aug 2023 02:10:24 -0500 Subject: [PATCH 2/6] save --- pkg/backup/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index 58094d25..f58ee1d7 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -131,7 +131,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str log.Debugf("prepare table concurrent semaphore with concurrency=%d len(tablesForUpload)=%d", b.cfg.General.UploadConcurrency, len(tablesForUpload)) uploadSemaphore := semaphore.NewWeighted(int64(b.cfg.General.UploadConcurrency)) - uploadGroup, uploadCtx := errgroup.WithContext(context.Background()) + uploadGroup, uploadCtx := errgroup.WithContext(ctx) for i, table := range tablesForUpload { if err := uploadSemaphore.Acquire(uploadCtx, 1); err != nil { From 536e785589540ef1e7e7d1a66bd14586600450a3 Mon Sep 17 00:00:00 2001 From: Minh Duc Nguyen Date: Wed, 23 Aug 2023 02:19:12 -0500 Subject: [PATCH 3/6] keep ctx --- pkg/storage/gcs.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 79b76866..af09d300 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -217,8 +217,7 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error key = path.Join(gcs.Config.Path, key) obj := pClient.Bucket(gcs.Config.Bucket).Object(key) - writer_ctx := context.Background() - writer := obj.NewWriter(writer_ctx) + writer := obj.NewWriter(ctx) writer.StorageClass = gcs.Config.StorageClass if len(gcs.Config.ObjectLabels) > 0 { writer.Metadata = gcs.Config.ObjectLabels From 03515a21cb2f4a136a602cc79d40446a0e48f29d Mon Sep 17 00:00:00 2001 From: Minh Duc Nguyen Date: Wed, 23 Aug 2023 02:20:48 -0500 Subject: [PATCH 4/6] keep ctx --- pkg/storage/gcs.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index af09d300..147937cf 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -242,8 +242,7 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { return nil, err } pClient := pClientObj.(*clientObject).Client - gcs_ctx := context.Background() - objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(gcs_ctx) + objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return nil, ErrNotFound @@ -267,8 +266,7 @@ func (gcs *GCS) deleteKey(ctx context.Context, key string) error { } pClient := pClientObj.(*clientObject).Client object := pClient.Bucket(gcs.Config.Bucket).Object(key) - gcs_ctx := context.Background() - err = object.Delete(gcs_ctx) + err = object.Delete(ctx) if err != nil { gcs.clientPool.InvalidateObject(ctx, pClientObj) return err @@ -297,13 +295,12 @@ func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey) src := pClient.Bucket(srcBucket).Object(srcKey) dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey) - gcs_ctx := context.Background() - attrs, err := src.Attrs(gcs_ctx) + attrs, err := src.Attrs(ctx) if err != nil { gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } - if _, err = dst.CopierFrom(src).Run(gcs_ctx); err != nil { + if _, err = dst.CopierFrom(src).Run(ctx); err != nil { gcs.clientPool.InvalidateObject(ctx, pClientObj) return 0, err } From c3b2dd5e18676f4b4427dbc139762c4e622e40f2 Mon Sep 17 00:00:00 2001 From: Minh Duc Nguyen Date: Wed, 23 Aug 2023 03:10:42 -0500 Subject: [PATCH 5/6] use v2 --- go.mod | 1 - go.sum | 2 -- pkg/storage/gcs.go | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 6b9e810f..2c2c7c93 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,6 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/jlaffaye/ftp v0.2.0 - github.com/jolestar/go-commons-pool v2.0.0+incompatible github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/klauspost/compress v1.16.6 diff --git a/go.sum b/go.sum index b4416202..6850d4ae 100644 --- a/go.sum +++ b/go.sum @@ -259,8 +259,6 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE= -github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8= github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM= github.com/jolestar/go-commons-pool/v2 v2.1.2/go.mod h1:r4NYccrkS5UqP1YQI1COyTZ9UjPJAAGTUxzcsK1kqhY= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 147937cf..93c71f2c 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -14,7 +14,7 @@ import ( "google.golang.org/api/iterator" "github.com/Altinity/clickhouse-backup/pkg/config" - pool "github.com/jolestar/go-commons-pool" + pool "github.com/jolestar/go-commons-pool/v2" "google.golang.org/api/option/internaloption" "cloud.google.com/go/storage" From a51731d3011ab4695c88b7d56ea8b0ace74eec03 Mon Sep 17 00:00:00 2001 From: Minh Duc Nguyen Date: Wed, 23 Aug 2023 03:23:30 -0500 Subject: [PATCH 6/6] change to GCS_CLIENT_POOL_SIZE --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 9e9593df..9273ed58 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -79,7 +79,7 @@ type GCSConfig struct { CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"` // NOTE: ClientPoolSize should be atleast 2 times bigger than // UploadConcurrency or DownloadConcurrency in each upload and download case - ClientPoolSize int `yaml:"client_pool_size" envconfig:"CLIENT_POOL_SIZE"` + ClientPoolSize int `yaml:"client_pool_size" envconfig:"GCS_CLIENT_POOL_SIZE"` } // AzureBlobConfig - Azure Blob settings section