Skip to content

Commit

Permalink
br: configure the httpclient for external storage (#46040) (#46141)
Browse files Browse the repository at this point in the history
close #46011
  • Loading branch information
ti-chi-bot authored Sep 8, 2023
1 parent 84f7f64 commit e006049
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 3 deletions.
3 changes: 2 additions & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ go_test(
"memstore_test.go",
"parse_test.go",
"s3_test.go",
"storage_test.go",
"writer_test.go",
],
embed = [":storage"],
flaky = True,
shard_count = 43,
shard_count = 45,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
19 changes: 17 additions & 2 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ type sharedKeyClientBuilder struct {
cred *azblob.SharedKeyCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) {
return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions())
return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, b.clientOptions)
}

func (b *sharedKeyClientBuilder) GetAccountName() string {
Expand All @@ -122,10 +124,12 @@ type tokenClientBuilder struct {
cred *azidentity.ClientSecretCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) {
return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions())
return azblob.NewServiceClient(b.serviceURL, b.cred, b.clientOptions)
}

func (b *tokenClientBuilder) GetAccountName() string {
Expand All @@ -144,6 +148,11 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
return nil, errors.New("bucket(container) cannot be empty to access azure blob storage")
}

clientOptions := getDefaultClientOptions()
if opts != nil && opts.HTTPClient != nil {
clientOptions.Transporter = opts.HTTPClient
}

if len(options.AccountName) > 0 && len(options.SharedKey) > 0 {
serviceURL := options.Endpoint
if len(serviceURL) == 0 {
Expand All @@ -157,6 +166,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
options.AccountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down Expand Up @@ -185,6 +196,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}
log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?"))
Expand Down Expand Up @@ -212,6 +225,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down
18 changes: 18 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
GetObject Permission = "GetObject"
// PutObject represents PutObject permission
PutObject Permission = "PutObject"

DefaultRequestConcurrency uint = 128
)

// WalkOption is the option of storage.WalkDir.
Expand Down Expand Up @@ -190,3 +192,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend)
}
}

// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost`
// to the actual request concurrency to reuse tcp connection as much as possible.
func GetDefaultHttpClient(concurrency uint) *http.Client {
transport, _ := CloneDefaultHttpTransport()
transport.MaxIdleConns = int(concurrency)
transport.MaxIdleConnsPerHost = int(concurrency)
return &http.Client{
Transport: transport,
}
}

func CloneDefaultHttpTransport() (*http.Transport, bool) {
transport, ok := http.DefaultTransport.(*http.Transport)
return transport.Clone(), ok
}
26 changes: 26 additions & 0 deletions br/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package storage_test

import (
"net/http"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestDefaultHttpTransport(t *testing.T) {
transport, ok := storage.CloneDefaultHttpTransport()
require.True(t, ok)
require.True(t, transport.MaxConnsPerHost == 0)
require.True(t, transport.MaxIdleConns > 0)
}

func TestDefaultHttpClient(t *testing.T) {
var concurrency uint = 128
transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport)
require.True(t, ok)
require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost)
require.Equal(t, int(concurrency), transport.MaxIdleConns)
}
2 changes: 2 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
storage, err := storage.New(ctx, u, &opts)
if err != nil {
Expand Down Expand Up @@ -1471,6 +1472,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit e006049

Please sign in to comment.