From 4cc7f25b4c82a50e39b30417e14724697ce5ca8b Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 16 Aug 2023 13:51:30 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #46040 Signed-off-by: ti-chi-bot --- br/pkg/storage/BUILD.bazel | 5 +++ br/pkg/storage/azblob.go | 65 ++++++++++++++++++++++++++++++++++ br/pkg/storage/storage.go | 18 ++++++++++ br/pkg/storage/storage_test.go | 26 ++++++++++++++ br/pkg/task/stream.go | 2 ++ 5 files changed, 116 insertions(+) create mode 100644 br/pkg/storage/storage_test.go diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 810585525db8d..3a4b56dea6f00 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -67,11 +67,16 @@ go_test( "memstore_test.go", "parse_test.go", "s3_test.go", + "storage_test.go", "writer_test.go", ], embed = [":storage"], flaky = True, +<<<<<<< HEAD shard_count = 43, +======= + shard_count = 47, +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c3d734ebe9a12..dc5b414d3406b 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -107,25 +107,59 @@ type sharedKeyClientBuilder struct { cred *azblob.SharedKeyCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } +<<<<<<< HEAD func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions()) +======= +func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions) +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) } func (b *sharedKeyClientBuilder) GetAccountName() string { return b.accountName } +<<<<<<< HEAD +======= +// use SAS to access azure blob storage +type sasClientBuilder struct { + accountName string + // Example of serviceURL: https://.blob.core.windows.net/? + serviceURL string + + clientOptions *azblob.ClientOptions +} + +func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions) +} + +func (b *sasClientBuilder) GetAccountName() string { + return b.accountName +} + +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) // use token to access azure blob storage type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } +<<<<<<< HEAD func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions()) +======= +func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions) +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) } func (b *tokenClientBuilder) GetAccountName() string { @@ -144,6 +178,31 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } +<<<<<<< HEAD +======= + clientOptions := getDefaultClientOptions() + if opts != nil && opts.HTTPClient != nil { + clientOptions.Transport = opts.HTTPClient + } + + if len(options.AccountName) > 0 && len(options.AccessSig) > 0 { + serviceURL := options.Endpoint + if len(serviceURL) == 0 { + if strings.HasPrefix(options.AccessSig, "?") { + serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/%s", options.AccountName, options.AccessSig) + } else { + serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/?%s", options.AccountName, options.AccessSig) + } + } + return &sasClientBuilder{ + options.AccountName, + serviceURL, + + clientOptions, + }, nil + } + +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) if len(options.AccountName) > 0 && len(options.SharedKey) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { @@ -157,6 +216,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -185,6 +246,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?")) @@ -212,6 +275,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 2ccc67d4eab20..40664ac11df06 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -27,6 +27,8 @@ const ( GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" + + DefaultRequestConcurrency uint = 128 ) // WalkOption is the option of storage.WalkDir. @@ -190,3 +192,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend) } } + +// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` +// to the actual request concurrency to reuse tcp connection as much as possible. +func GetDefaultHttpClient(concurrency uint) *http.Client { + transport, _ := CloneDefaultHttpTransport() + transport.MaxIdleConns = int(concurrency) + transport.MaxIdleConnsPerHost = int(concurrency) + return &http.Client{ + Transport: transport, + } +} + +func CloneDefaultHttpTransport() (*http.Transport, bool) { + transport, ok := http.DefaultTransport.(*http.Transport) + return transport.Clone(), ok +} diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go new file mode 100644 index 0000000000000..c6ca5c39b6a02 --- /dev/null +++ b/br/pkg/storage/storage_test.go @@ -0,0 +1,26 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package storage_test + +import ( + "net/http" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestDefaultHttpTransport(t *testing.T) { + transport, ok := storage.CloneDefaultHttpTransport() + require.True(t, ok) + require.True(t, transport.MaxConnsPerHost == 0) + require.True(t, transport.MaxIdleConns > 0) +} + +func TestDefaultHttpClient(t *testing.T) { + var concurrency uint = 128 + transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) + require.True(t, ok) + require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost) + require.Equal(t, int(concurrency), transport.MaxIdleConns) +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8c1e3490fc6d7..eec02d1adce14 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1470,6 +1471,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err) From e4f6a9c299389551b743552b00d29c52ad170d49 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 14:39:46 +0800 Subject: [PATCH 2/4] resolve conflicts Signed-off-by: Leavrth --- br/pkg/storage/BUILD.bazel | 6 +--- br/pkg/storage/azblob.go | 56 ++------------------------------------ 2 files changed, 4 insertions(+), 58 deletions(-) diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 3a4b56dea6f00..e0d108af81876 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -72,11 +72,7 @@ go_test( ], embed = [":storage"], flaky = True, -<<<<<<< HEAD - shard_count = 43, -======= - shard_count = 47, ->>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) + shard_count = 45, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index dc5b414d3406b..a438d9b8f860e 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -111,39 +111,14 @@ type sharedKeyClientBuilder struct { clientOptions *azblob.ClientOptions } -<<<<<<< HEAD func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions()) -======= -func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions) ->>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) + return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, b.clientOptions) } func (b *sharedKeyClientBuilder) GetAccountName() string { return b.accountName } -<<<<<<< HEAD -======= -// use SAS to access azure blob storage -type sasClientBuilder struct { - accountName string - // Example of serviceURL: https://.blob.core.windows.net/? - serviceURL string - - clientOptions *azblob.ClientOptions -} - -func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions) -} - -func (b *sasClientBuilder) GetAccountName() string { - return b.accountName -} - ->>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) // use token to access azure blob storage type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential @@ -153,13 +128,8 @@ type tokenClientBuilder struct { clientOptions *azblob.ClientOptions } -<<<<<<< HEAD func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions()) -======= -func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions) ->>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) + return azblob.NewServiceClient(b.serviceURL, b.cred, b.clientOptions) } func (b *tokenClientBuilder) GetAccountName() string { @@ -178,31 +148,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } -<<<<<<< HEAD -======= clientOptions := getDefaultClientOptions() if opts != nil && opts.HTTPClient != nil { - clientOptions.Transport = opts.HTTPClient - } - - if len(options.AccountName) > 0 && len(options.AccessSig) > 0 { - serviceURL := options.Endpoint - if len(serviceURL) == 0 { - if strings.HasPrefix(options.AccessSig, "?") { - serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/%s", options.AccountName, options.AccessSig) - } else { - serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/?%s", options.AccountName, options.AccessSig) - } - } - return &sasClientBuilder{ - options.AccountName, - serviceURL, - - clientOptions, - }, nil + clientOptions.Transporter = opts.HTTPClient } ->>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) if len(options.AccountName) > 0 && len(options.SharedKey) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { From 4e4919b15c99eb6ad3daf59e0cf0e43d47fc1775 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 14:46:45 +0800 Subject: [PATCH 3/4] resolve conflicts Signed-off-by: Leavrth --- br/pkg/stream/stream_mgr.go | 4 ++++ br/pkg/task/stream.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index f998006a2dd17..5c5592a46e993 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -42,6 +42,10 @@ const ( metaDataWorkerPoolSize = 128 ) +func GetMetadataWorkerPoolSize() int { + return metaDataWorkerPoolSize +} + func GetStreamBackupMetaPrefix() string { return streamBackupMetaPrefix } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index eec02d1adce14..ed9668bc495aa 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,7 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), + HTTPClient: storage.GetDefaultHttpClient(uint(stream.GetMetadataWorkerPoolSize())), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1471,7 +1471,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), + HTTPClient: storage.GetDefaultHttpClient(uint(stream.GetMetadataWorkerPoolSize())), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err) From deea668c395f382064592efca96fb19cbf9dc92a Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 16:30:42 +0800 Subject: [PATCH 4/4] resolve conflicts Signed-off-by: Leavrth --- br/pkg/stream/stream_mgr.go | 4 ---- br/pkg/task/stream.go | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index 95a29f64eb04e..42e8bf4459d6c 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -40,10 +40,6 @@ const ( streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint" ) -func GetMetadataWorkerPoolSize() int { - return metaDataWorkerPoolSize -} - func GetStreamBackupMetaPrefix() string { return streamBackupMetaPrefix } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b2b8ba53c5346..0b195128f42a2 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,7 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(uint(stream.GetMetadataWorkerPoolSize())), + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1472,7 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(uint(stream.GetMetadataWorkerPoolSize())), + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err)