From 4d7a529d3f0d7734f120e2d0596e7af380dd6ec7 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 11 Aug 2023 18:50:37 +0800 Subject: [PATCH 1/8] configure the httpclient for external storage Signed-off-by: Leavrth --- br/pkg/storage/storage.go | 33 +++++++++++++++++++++++++++++++++ br/pkg/task/stream.go | 2 ++ 2 files changed, 35 insertions(+) diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 894019a2ede8a..e3b1f11158af7 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -5,7 +5,9 @@ package storage import ( "context" "io" + "net" "net/http" + "time" "github.com/aws/aws-sdk-go/aws/request" "github.com/pingcap/errors" @@ -27,6 +29,8 @@ const ( GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" + + DefaultRequestConcurrency uint = 128 ) // WalkOption is the option of storage.WalkDir. @@ -169,6 +173,9 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt if opts == nil { opts = &ExternalStorageOptions{} } + if opts.HTTPClient == nil { + opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) + } switch backend := backend.Backend.(type) { case *backuppb.StorageBackend_Local: if backend.Local == nil { @@ -198,3 +205,29 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend) } } + +// copy from `http.defaultTransportDialContext` +func defaultTransportDialContext(dialer *net.Dialer) func(context.Context, string, string) (net.Conn, error) { + return dialer.DialContext +} + +// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` +// to the actual request concurrency to reuse tcp connection as much as possible. +func GetDefaultHttpClient(concurrency uint) *http.Client { + return &http.Client{ + // copy from `http.DefaultTransport` + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: defaultTransportDialContext(&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }), + ForceAttemptHTTP2: true, + MaxIdleConns: int(concurrency), + MaxIdleConnsPerHost: int(concurrency), + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b4a7693097ec6..216b102d6836f 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1471,6 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err) From 99a8e82e4fa5f45d0e6cc40057f4960c60b0f80e Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 14 Aug 2023 13:24:42 +0800 Subject: [PATCH 2/8] support custom http client for azure blob Signed-off-by: Leavrth --- br/pkg/storage/azblob.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 48f19caaae8be..87a6584b7681e 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -157,10 +157,12 @@ type sharedKeyClientBuilder struct { cred *azblob.SharedKeyCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions) } func (b *sharedKeyClientBuilder) GetAccountName() string { @@ -172,10 +174,12 @@ type sasClientBuilder struct { accountName string // Example of serviceURL: https://.blob.core.windows.net/? serviceURL string + + clientOptions *azblob.ClientOptions } func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClientWithNoCredential(b.serviceURL, getDefaultClientOptions()) + return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions) } func (b *sasClientBuilder) GetAccountName() string { @@ -187,10 +191,12 @@ type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) { - return azblob.NewClient(b.serviceURL, b.cred, getDefaultClientOptions()) + return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions) } func (b *tokenClientBuilder) GetAccountName() string { @@ -209,6 +215,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } + clientOptions := getDefaultClientOptions() + if opts.HTTPClient != nil { + clientOptions.Transport = opts.HTTPClient + } + if len(options.AccountName) > 0 && len(options.AccessSig) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { @@ -221,6 +232,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return &sasClientBuilder{ options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -237,6 +250,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -265,6 +280,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?")) @@ -292,6 +309,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } From da993988e19b3998fbe0284b7033fa95f9d419a5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 15 Aug 2023 17:29:03 +0800 Subject: [PATCH 3/8] commit some suggestions Signed-off-by: Leavrth --- br/pkg/storage/azblob.go | 2 +- br/pkg/storage/storage.go | 30 +++++++++--------------------- br/pkg/storage/storage_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 br/pkg/storage/storage_test.go diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 87a6584b7681e..25221bc10a882 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -216,7 +216,7 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte } clientOptions := getDefaultClientOptions() - if opts.HTTPClient != nil { + if opts != nil && opts.HTTPClient != nil { clientOptions.Transport = opts.HTTPClient } diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index e3b1f11158af7..6b864b2b03e9c 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -5,9 +5,7 @@ package storage import ( "context" "io" - "net" "net/http" - "time" "github.com/aws/aws-sdk-go/aws/request" "github.com/pingcap/errors" @@ -206,28 +204,18 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt } } -// copy from `http.defaultTransportDialContext` -func defaultTransportDialContext(dialer *net.Dialer) func(context.Context, string, string) (net.Conn, error) { - return dialer.DialContext -} - // Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` // to the actual request concurrency to reuse tcp connection as much as possible. func GetDefaultHttpClient(concurrency uint) *http.Client { + transport, _ := CloneDefaultHttpTransport() + transport.MaxIdleConns = int(concurrency) + transport.MaxIdleConnsPerHost = int(concurrency) return &http.Client{ - // copy from `http.DefaultTransport` - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: defaultTransportDialContext(&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }), - ForceAttemptHTTP2: true, - MaxIdleConns: int(concurrency), - MaxIdleConnsPerHost: int(concurrency), - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - }, + Transport: transport, } } + +func CloneDefaultHttpTransport() (*http.Transport, bool) { + transport, ok := http.DefaultTransport.(*http.Transport) + return transport.Clone(), ok +} diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go new file mode 100644 index 0000000000000..152c193eabf99 --- /dev/null +++ b/br/pkg/storage/storage_test.go @@ -0,0 +1,26 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package storage_test + +import ( + "net/http" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestDefaultHttpTransport(t *testing.T) { + transport, ok := storage.CloneDefaultHttpTransport() + require.True(t, ok) + require.True(t, transport.MaxConnsPerHost == 0) + require.True(t, transport.MaxIdleConns > 0) +} + +func TestDefaultHttpClient(t *testing.T) { + var concurrency uint = 128 + transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) + require.True(t, ok) + require.Equal(t, transport.MaxIdleConnsPerHost, int(concurrency)) + require.Equal(t, transport.MaxIdleConns, int(concurrency)) +} From 1818e5de7383cdec0ed4e2bfb349881d60c40ded Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 15 Aug 2023 18:44:37 +0800 Subject: [PATCH 4/8] make bazel_prepare Signed-off-by: Leavrth --- br/pkg/storage/BUILD.bazel | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 81e3d4a3d2468..4e4c368282d31 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -72,11 +72,12 @@ go_test( "memstore_test.go", "parse_test.go", "s3_test.go", + "storage_test.go", "writer_test.go", ], embed = [":storage"], flaky = True, - shard_count = 45, + shard_count = 47, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", From 0d32dcbc2360baf1953bda39a96a243463a39fbc Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 15 Aug 2023 18:45:54 +0800 Subject: [PATCH 5/8] commit some suggestions Signed-off-by: Leavrth --- br/pkg/storage/storage_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go index 152c193eabf99..c6ca5c39b6a02 100644 --- a/br/pkg/storage/storage_test.go +++ b/br/pkg/storage/storage_test.go @@ -21,6 +21,6 @@ func TestDefaultHttpClient(t *testing.T) { var concurrency uint = 128 transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) require.True(t, ok) - require.Equal(t, transport.MaxIdleConnsPerHost, int(concurrency)) - require.Equal(t, transport.MaxIdleConns, int(concurrency)) + require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost) + require.Equal(t, int(concurrency), transport.MaxIdleConns) } From a373296397432536111158d5ea6f0fe6ca25b14c Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 10:18:39 +0800 Subject: [PATCH 6/8] fix unit test Signed-off-by: Leavrth --- br/pkg/lightning/importer/precheck_impl.go | 2 ++ br/pkg/storage/storage.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/importer/precheck_impl.go b/br/pkg/lightning/importer/precheck_impl.go index ed3b034555906..c92c97ea5c9d7 100644 --- a/br/pkg/lightning/importer/precheck_impl.go +++ b/br/pkg/lightning/importer/precheck_impl.go @@ -17,6 +17,7 @@ import ( "cmp" "context" "fmt" + "net/http" "path/filepath" "reflect" "slices" @@ -432,6 +433,7 @@ func (ci *storagePermissionCheckItem) Check(ctx context.Context) (*precheck.Chec storage.ListObjects, storage.GetObject, }, + HTTPClient: &http.Client{}, }) if err != nil { theResult.Passed = false diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 6b864b2b03e9c..916baa7995faf 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -172,7 +172,7 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt opts = &ExternalStorageOptions{} } if opts.HTTPClient == nil { - opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) + //opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) } switch backend := backend.Backend.(type) { case *backuppb.StorageBackend_Local: From dceea4483d080ddb53ad1320fc088440b41d9062 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 10:24:05 +0800 Subject: [PATCH 7/8] revert fix unit test Signed-off-by: Leavrth --- br/pkg/lightning/importer/precheck_impl.go | 2 -- br/pkg/storage/storage.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/br/pkg/lightning/importer/precheck_impl.go b/br/pkg/lightning/importer/precheck_impl.go index c92c97ea5c9d7..ed3b034555906 100644 --- a/br/pkg/lightning/importer/precheck_impl.go +++ b/br/pkg/lightning/importer/precheck_impl.go @@ -17,7 +17,6 @@ import ( "cmp" "context" "fmt" - "net/http" "path/filepath" "reflect" "slices" @@ -433,7 +432,6 @@ func (ci *storagePermissionCheckItem) Check(ctx context.Context) (*precheck.Chec storage.ListObjects, storage.GetObject, }, - HTTPClient: &http.Client{}, }) if err != nil { theResult.Passed = false diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 916baa7995faf..6b864b2b03e9c 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -172,7 +172,7 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt opts = &ExternalStorageOptions{} } if opts.HTTPClient == nil { - //opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) + opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) } switch backend := backend.Backend.(type) { case *backuppb.StorageBackend_Local: From a439ba22a8da8e8581d5e234ea46e13d2e268f57 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 16 Aug 2023 12:56:25 +0800 Subject: [PATCH 8/8] use default http client in default Signed-off-by: Leavrth --- br/pkg/storage/storage.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 6b864b2b03e9c..01e356f2e0c57 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -171,9 +171,6 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt if opts == nil { opts = &ExternalStorageOptions{} } - if opts.HTTPClient == nil { - opts.HTTPClient = GetDefaultHttpClient(DefaultRequestConcurrency) - } switch backend := backend.Backend.(type) { case *backuppb.StorageBackend_Local: if backend.Local == nil {