From e0060497f910473ea84644bc0ccd2d03aa6d8f1b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 8 Sep 2023 14:09:44 +0800 Subject: [PATCH] br: configure the httpclient for external storage (#46040) (#46141) close pingcap/tidb#46011 --- br/pkg/storage/BUILD.bazel | 3 ++- br/pkg/storage/azblob.go | 19 +++++++++++++++++-- br/pkg/storage/storage.go | 18 ++++++++++++++++++ br/pkg/storage/storage_test.go | 26 ++++++++++++++++++++++++++ br/pkg/task/stream.go | 2 ++ 5 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 br/pkg/storage/storage_test.go diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 810585525db8d..e0d108af81876 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -67,11 +67,12 @@ go_test( "memstore_test.go", "parse_test.go", "s3_test.go", + "storage_test.go", "writer_test.go", ], embed = [":storage"], flaky = True, - shard_count = 43, + shard_count = 45, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c3d734ebe9a12..a438d9b8f860e 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -107,10 +107,12 @@ type sharedKeyClientBuilder struct { cred *azblob.SharedKeyCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, b.clientOptions) } func (b *sharedKeyClientBuilder) GetAccountName() string { @@ -122,10 +124,12 @@ type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewServiceClient(b.serviceURL, b.cred, b.clientOptions) } func (b *tokenClientBuilder) GetAccountName() string { @@ -144,6 +148,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } + clientOptions := getDefaultClientOptions() + if opts != nil && opts.HTTPClient != nil { + clientOptions.Transporter = opts.HTTPClient + } + if len(options.AccountName) > 0 && len(options.SharedKey) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { @@ -157,6 +166,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -185,6 +196,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?")) @@ -212,6 +225,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 2ccc67d4eab20..40664ac11df06 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -27,6 +27,8 @@ const ( GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" + + DefaultRequestConcurrency uint = 128 ) // WalkOption is the option of storage.WalkDir. @@ -190,3 +192,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend) } } + +// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` +// to the actual request concurrency to reuse tcp connection as much as possible. +func GetDefaultHttpClient(concurrency uint) *http.Client { + transport, _ := CloneDefaultHttpTransport() + transport.MaxIdleConns = int(concurrency) + transport.MaxIdleConnsPerHost = int(concurrency) + return &http.Client{ + Transport: transport, + } +} + +func CloneDefaultHttpTransport() (*http.Transport, bool) { + transport, ok := http.DefaultTransport.(*http.Transport) + return transport.Clone(), ok +} diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go new file mode 100644 index 0000000000000..c6ca5c39b6a02 --- /dev/null +++ b/br/pkg/storage/storage_test.go @@ -0,0 +1,26 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package storage_test + +import ( + "net/http" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestDefaultHttpTransport(t *testing.T) { + transport, ok := storage.CloneDefaultHttpTransport() + require.True(t, ok) + require.True(t, transport.MaxConnsPerHost == 0) + require.True(t, transport.MaxIdleConns > 0) +} + +func TestDefaultHttpClient(t *testing.T) { + var concurrency uint = 128 + transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) + require.True(t, ok) + require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost) + require.Equal(t, int(concurrency), transport.MaxIdleConns) +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 53ff04f079232..0b195128f42a2 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1471,6 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err)