Skip to content

Commit

Permalink
br: configure the httpclient for external storage (#46040)
Browse files Browse the repository at this point in the history
close #46011
  • Loading branch information
Leavrth authored Aug 16, 2023
1 parent 0fb21c5 commit 8822578
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 4 deletions.
3 changes: 2 additions & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ go_test(
"memstore_test.go",
"parse_test.go",
"s3_test.go",
"storage_test.go",
"writer_test.go",
],
embed = [":storage"],
flaky = True,
shard_count = 45,
shard_count = 47,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
25 changes: 22 additions & 3 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ type sharedKeyClientBuilder struct {
cred *azblob.SharedKeyCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, getDefaultClientOptions())
return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions)
}

func (b *sharedKeyClientBuilder) GetAccountName() string {
Expand All @@ -172,10 +174,12 @@ type sasClientBuilder struct {
accountName string
// Example of serviceURL: https://<account>.blob.core.windows.net/?<sas token>
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClientWithNoCredential(b.serviceURL, getDefaultClientOptions())
return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions)
}

func (b *sasClientBuilder) GetAccountName() string {
Expand All @@ -187,10 +191,12 @@ type tokenClientBuilder struct {
cred *azidentity.ClientSecretCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClient(b.serviceURL, b.cred, getDefaultClientOptions())
return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions)
}

func (b *tokenClientBuilder) GetAccountName() string {
Expand All @@ -209,6 +215,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
return nil, errors.New("bucket(container) cannot be empty to access azure blob storage")
}

clientOptions := getDefaultClientOptions()
if opts != nil && opts.HTTPClient != nil {
clientOptions.Transport = opts.HTTPClient
}

if len(options.AccountName) > 0 && len(options.AccessSig) > 0 {
serviceURL := options.Endpoint
if len(serviceURL) == 0 {
Expand All @@ -221,6 +232,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
return &sasClientBuilder{
options.AccountName,
serviceURL,

clientOptions,
}, nil
}

Expand All @@ -237,6 +250,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
options.AccountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down Expand Up @@ -265,6 +280,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}
log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?"))
Expand Down Expand Up @@ -292,6 +309,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down
18 changes: 18 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
GetObject Permission = "GetObject"
// PutObject represents PutObject permission
PutObject Permission = "PutObject"

DefaultRequestConcurrency uint = 128
)

// WalkOption is the option of storage.WalkDir.
Expand Down Expand Up @@ -198,3 +200,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend)
}
}

// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost`
// to the actual request concurrency to reuse tcp connection as much as possible.
func GetDefaultHttpClient(concurrency uint) *http.Client {
transport, _ := CloneDefaultHttpTransport()
transport.MaxIdleConns = int(concurrency)
transport.MaxIdleConnsPerHost = int(concurrency)
return &http.Client{
Transport: transport,
}
}

func CloneDefaultHttpTransport() (*http.Transport, bool) {
transport, ok := http.DefaultTransport.(*http.Transport)
return transport.Clone(), ok
}
26 changes: 26 additions & 0 deletions br/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package storage_test

import (
"net/http"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestDefaultHttpTransport(t *testing.T) {
transport, ok := storage.CloneDefaultHttpTransport()
require.True(t, ok)
require.True(t, transport.MaxConnsPerHost == 0)
require.True(t, transport.MaxIdleConns > 0)
}

func TestDefaultHttpClient(t *testing.T) {
var concurrency uint = 128
transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport)
require.True(t, ok)
require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost)
require.Equal(t, int(concurrency), transport.MaxIdleConns)
}
2 changes: 2 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
storage, err := storage.New(ctx, u, &opts)
if err != nil {
Expand Down Expand Up @@ -1471,6 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit 8822578

Please sign in to comment.