diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 81e3d4a3d2468..4e4c368282d31 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -72,11 +72,12 @@ go_test( "memstore_test.go", "parse_test.go", "s3_test.go", + "storage_test.go", "writer_test.go", ], embed = [":storage"], flaky = True, - shard_count = 45, + shard_count = 47, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 48f19caaae8be..25221bc10a882 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -157,10 +157,12 @@ type sharedKeyClientBuilder struct { cred *azblob.SharedKeyCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions) } func (b *sharedKeyClientBuilder) GetAccountName() string { @@ -172,10 +174,12 @@ type sasClientBuilder struct { accountName string // Example of serviceURL: https://.blob.core.windows.net/? serviceURL string + + clientOptions *azblob.ClientOptions } func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithNoCredential(b.serviceURL, getDefaultClientOptions()) + return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions) } func (b *sasClientBuilder) GetAccountName() string { @@ -187,10 +191,12 @@ type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClient(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions) } func (b *tokenClientBuilder) GetAccountName() string { @@ -209,6 +215,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } + clientOptions := getDefaultClientOptions() + if opts != nil && opts.HTTPClient != nil { + clientOptions.Transport = opts.HTTPClient + } + if len(options.AccountName) > 0 && len(options.AccessSig) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { @@ -221,6 +232,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return &sasClientBuilder{ options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -237,6 +250,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -265,6 +280,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?")) @@ -292,6 +309,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 894019a2ede8a..01e356f2e0c57 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -27,6 +27,8 @@ const ( GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" + + DefaultRequestConcurrency uint = 128 ) // WalkOption is the option of storage.WalkDir. @@ -198,3 +200,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend) } } + +// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` +// to the actual request concurrency to reuse tcp connection as much as possible. +func GetDefaultHttpClient(concurrency uint) *http.Client { + transport, _ := CloneDefaultHttpTransport() + transport.MaxIdleConns = int(concurrency) + transport.MaxIdleConnsPerHost = int(concurrency) + return &http.Client{ + Transport: transport, + } +} + +func CloneDefaultHttpTransport() (*http.Transport, bool) { + transport, ok := http.DefaultTransport.(*http.Transport) + return transport.Clone(), ok +} diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go new file mode 100644 index 0000000000000..c6ca5c39b6a02 --- /dev/null +++ b/br/pkg/storage/storage_test.go @@ -0,0 +1,26 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package storage_test + +import ( + "net/http" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestDefaultHttpTransport(t *testing.T) { + transport, ok := storage.CloneDefaultHttpTransport() + require.True(t, ok) + require.True(t, transport.MaxConnsPerHost == 0) + require.True(t, transport.MaxIdleConns > 0) +} + +func TestDefaultHttpClient(t *testing.T) { + var concurrency uint = 128 + transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) + require.True(t, ok) + require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost) + require.Equal(t, int(concurrency), transport.MaxIdleConns) +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b4a7693097ec6..216b102d6836f 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1471,6 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err)