Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add connection to gcs and use different context for upload incase it … #727

Merged
merged 7 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ build/
_instances/
_coverage_/
__pycache__/
*.py[cod]
*.py[cod]
vendor/
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/jlaffaye/ftp v0.2.0
github.com/jolestar/go-commons-pool v2.0.0+incompatible
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.16.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE=
github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8=
github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM=
github.com/jolestar/go-commons-pool/v2 v2.1.2/go.mod h1:r4NYccrkS5UqP1YQI1COyTZ9UjPJAAGTUxzcsK1kqhY=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
Expand Down
11 changes: 6 additions & 5 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"
"io"
"os"
"path"
Expand All @@ -20,6 +15,12 @@ import (
"sync/atomic"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/eapache/go-resiliency/retrier"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package config
import (
"crypto/tls"
"fmt"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"math"
"os"
"runtime"
"strings"
"time"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/apex/log"
"github.com/kelseyhightower/envconfig"
"github.com/urfave/cli"
Expand Down Expand Up @@ -76,6 +77,9 @@ type GCSConfig struct {
StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"`
ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"`
CustomStorageClassMap map[string]string `yaml:"custom_storage_class_map" envconfig:"GCS_CUSTOM_STORAGE_CLASS_MAP"`
// NOTE: ClientPoolSize should be atleast 2 times bigger than
// UploadConcurrency or DownloadConcurrency in each upload and download case
ClientPoolSize int `yaml:"client_pool_size" envconfig:"CLIENT_POOL_SIZE"`
minguyen9988 marked this conversation as resolved.
Show resolved Hide resolved
}

// AzureBlobConfig - Azure Blob settings section
Expand Down Expand Up @@ -544,6 +548,7 @@ func DefaultConfig() *Config {
CompressionLevel: 1,
CompressionFormat: "tar",
StorageClass: "STANDARD",
ClientPoolSize: 500,
},
COS: COSConfig{
RowURL: "",
Expand Down
113 changes: 101 additions & 12 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"encoding/base64"
"errors"
"fmt"
"google.golang.org/api/iterator"
"io"
"net/http"
"path"
"strings"
"time"

"google.golang.org/api/iterator"

"github.com/Altinity/clickhouse-backup/pkg/config"
pool "github.com/jolestar/go-commons-pool"
minguyen9988 marked this conversation as resolved.
Show resolved Hide resolved
"google.golang.org/api/option/internaloption"

"cloud.google.com/go/storage"
Expand All @@ -23,14 +25,19 @@ import (

// GCS - presents methods for manipulate data on GCS
type GCS struct {
client *storage.Client
Config *config.GCSConfig
client *storage.Client
Config *config.GCSConfig
clientPool *pool.ObjectPool
}

type debugGCSTransport struct {
base http.RoundTripper
}

type clientObject struct {
Client *storage.Client
}

func (w debugGCSTransport) RoundTrip(r *http.Request) (*http.Response, error) {
logMsg := fmt.Sprintf(">>> [GCS_REQUEST] >>> %v %v\n", r.Method, r.URL.String())
for h, values := range r.Header {
Expand Down Expand Up @@ -96,6 +103,30 @@ func (gcs *GCS) Connect(ctx context.Context) error {
clientOptions = append(clientOptions, option.WithHTTPClient(debugClient))
}

factory := pool.NewPooledObjectFactory(
func(context.Context) (interface{}, error) {
sClient, err := storage.NewClient(ctx, clientOptions...)
if err != nil {
return nil, err
}
return &clientObject{
Client: sClient,
},
nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// destroy
return object.Object.(*clientObject).Client.Close()
}, func(ctx context.Context, object *pool.PooledObject) bool {
return true
}, func(ctx context.Context, object *pool.PooledObject) error {
// activate do nothing
return nil
}, func(ctx context.Context, object *pool.PooledObject) error {
// passivate do nothing
return nil
})
gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory)
gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize
gcs.client, err = storage.NewClient(ctx, clientOptions...)
return err
}
Expand All @@ -105,6 +136,13 @@ func (gcs *GCS) Close(ctx context.Context) error {
}

func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, process func(ctx context.Context, r RemoteFile) error) error {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client

rootPath := path.Join(gcs.Config.Path, gcsPath)
prefix := rootPath + "/"
if rootPath == "/" {
Expand All @@ -114,22 +152,25 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
if !recursive {
delimiter = "/"
}
it := gcs.client.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
it := pClient.Bucket(gcs.Config.Bucket).Objects(ctx, &storage.Query{
Prefix: prefix,
Delimiter: delimiter,
})
for {
object, err := it.Next()
if errors.Is(err, iterator.Done) {
gcs.clientPool.ReturnObject(ctx, pClientObj)
return nil
}
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
if object.Prefix != "" {
if err := process(ctx, &gcsFile{
name: strings.TrimPrefix(object.Prefix, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
continue
Expand All @@ -139,17 +180,26 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
lastModified: object.Updated,
name: strings.TrimPrefix(object.Name, rootPath),
}); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
}
}

func (gcs *GCS) GetFileReader(ctx context.Context, key string) (io.ReadCloser, error) {
obj := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key))
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return nil, err
}
pClient := pClientObj.(*clientObject).Client
obj := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key))
reader, err := obj.NewReader(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return reader, nil
}

Expand All @@ -158,8 +208,15 @@ func (gcs *GCS) GetFileReaderWithLocalPath(ctx context.Context, key, _ string) (
}

func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client
key = path.Join(gcs.Config.Path, key)
obj := gcs.client.Bucket(gcs.Config.Bucket).Object(key)
obj := pClient.Bucket(gcs.Config.Bucket).Object(key)

writer := obj.NewWriter(ctx)
writer.StorageClass = gcs.Config.StorageClass
if len(gcs.Config.ObjectLabels) > 0 {
Expand All @@ -168,21 +225,32 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
defer func() {
if err := writer.Close(); err != nil {
log.Warnf("can't close writer: %+v", err)
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
}()
buffer := make([]byte, 512*1024)
_, err := io.CopyBuffer(writer, r, buffer)
_, err = io.CopyBuffer(writer, r, buffer)
return err
}

func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return nil, err
}
pClient := pClientObj.(*clientObject).Client
objAttr, err := pClient.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, ErrNotFound
}
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return nil, err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return &gcsFile{
size: objAttr.Size,
lastModified: objAttr.Updated,
Expand All @@ -191,8 +259,20 @@ func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
}

func (gcs *GCS) deleteKey(ctx context.Context, key string) error {
object := gcs.client.Bucket(gcs.Config.Bucket).Object(key)
return object.Delete(ctx)
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return err
}
pClient := pClientObj.(*clientObject).Client
object := pClient.Bucket(gcs.Config.Bucket).Object(key)
err = object.Delete(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return err
}
gcs.clientPool.ReturnObject(ctx, pClientObj)
return nil
}

func (gcs *GCS) DeleteFile(ctx context.Context, key string) error {
Expand All @@ -206,17 +286,26 @@ func (gcs *GCS) DeleteFileFromObjectDiskBackup(ctx context.Context, key string)
}

func (gcs *GCS) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
pClientObj, err := gcs.clientPool.BorrowObject(ctx)
if err != nil {
log.Errorf("can't get client connection from pool: %+v", err)
return 0, err
}
pClient := pClientObj.(*clientObject).Client
dstKey = path.Join(gcs.Config.ObjectDiskPath, dstKey)
src := gcs.client.Bucket(srcBucket).Object(srcKey)
dst := gcs.client.Bucket(gcs.Config.Bucket).Object(dstKey)
src := pClient.Bucket(srcBucket).Object(srcKey)
dst := pClient.Bucket(gcs.Config.Bucket).Object(dstKey)
attrs, err := src.Attrs(ctx)
if err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return 0, err
}
if _, err = dst.CopierFrom(src).Run(ctx); err != nil {
gcs.clientPool.InvalidateObject(ctx, pClientObj)
return 0, err
}
log.Debugf("GCS->CopyObject %s/%s -> %s/%s", srcBucket, srcKey, gcs.Config.Bucket, dstKey)
gcs.clientPool.ReturnObject(ctx, pClientObj)
return attrs.Size, nil
}

Expand Down