Skip to content

Commit

Permalink
Merge branch 'master' into reduce-test-time
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Dec 8, 2023
2 parents 12da5f4 + d8550f0 commit 3729fa8
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
env:
WORKER_ID: ${{ matrix.worker_id }}
WORKER_COUNT: 13
JOB_COUNT: 10 # 11, 12 13 are for other integrations jobs
JOB_COUNT: 10 # 11, 12, 13 are for other integrations jobs
run: |
make ci-test-job JOB_COUNT=$(($JOB_COUNT)) JOB_INDEX=$WORKER_ID
mv covprofile covprofile_$WORKER_ID
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ ci-test-job: install-tools dashboard-ui
else \
for mod in $(shell ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)); do cd $$mod && $(MAKE) ci-test-job && cd $(ROOT_PATH) > /dev/null && cat $$mod/covprofile >> covprofile; done; \
fi
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso

Expand Down
66 changes: 49 additions & 17 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

const (
defaultCallerID = "pd-http-client"
httpScheme = "http"
httpsScheme = "https"
networkErrorStatus = "network error"
Expand Down Expand Up @@ -79,6 +80,8 @@ type Client interface {
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
WithCallerID(string) Client
// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
// Additionally, it is important for the caller to handle the content of the response body properly
Expand All @@ -89,11 +92,20 @@ type Client interface {

var _ Client = (*client)(nil)

type client struct {
// clientInner is the inner implementation of the PD HTTP client, which will
// implement some internal logics, such as HTTP client, service discovery, etc.
type clientInner struct {
pdAddrs []string
tlsConf *tls.Config
cli *http.Client
}

type client struct {
// Wrap this struct is to make sure the inner implementation
// won't be exposed and cloud be consistent during the copy.
inner *clientInner

callerID string
respHandler func(resp *http.Response, res interface{}) error

requestCounter *prometheus.CounterVec
Expand All @@ -106,15 +118,15 @@ type ClientOption func(c *client)
// WithHTTPClient configures the client with the given initialized HTTP client.
func WithHTTPClient(cli *http.Client) ClientOption {
return func(c *client) {
c.cli = cli
c.inner.cli = cli
}
}

// WithTLSConfig configures the client with the given TLS config.
// This option won't work if the client is configured with WithHTTPClient.
func WithTLSConfig(tlsConf *tls.Config) ClientOption {
return func(c *client) {
c.tlsConf = tlsConf
c.inner.tlsConf = tlsConf
}
}

Expand All @@ -134,7 +146,7 @@ func NewClient(
pdAddrs []string,
opts ...ClientOption,
) Client {
c := &client{}
c := &client{inner: &clientInner{}, callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
Expand All @@ -143,22 +155,22 @@ func NewClient(
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, httpScheme) {
var scheme string
if c.tlsConf != nil {
if c.inner.tlsConf != nil {
scheme = httpsScheme
} else {
scheme = httpScheme
}
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)
}
}
c.pdAddrs = pdAddrs
c.inner.pdAddrs = pdAddrs
// Init the HTTP client if it's not configured.
if c.cli == nil {
c.cli = &http.Client{Timeout: defaultTimeout}
if c.tlsConf != nil {
if c.inner.cli == nil {
c.inner.cli = &http.Client{Timeout: defaultTimeout}
if c.inner.tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = c.tlsConf
c.cli.Transport = transport
transport.TLSClientConfig = c.inner.tlsConf
c.inner.cli.Transport = transport
}
}

Expand All @@ -167,12 +179,22 @@ func NewClient(

// Close closes the HTTP client.
func (c *client) Close() {
if c.cli != nil {
c.cli.CloseIdleConnections()
if c.inner == nil {
return
}
if c.inner.cli != nil {
c.inner.cli.CloseIdleConnections()
}
log.Info("[pd] http client closed")
}

// WithCallerID sets and returns a new client with the given caller ID.
func (c *client) WithCallerID(callerID string) Client {
newClient := *c
newClient.callerID = callerID
return &newClient
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
func (c *client) WithRespHandler(
handler func(resp *http.Response, res interface{}) error,
Expand All @@ -196,13 +218,19 @@ func (c *client) execDuration(name string, duration time.Duration) {
c.executionDuration.WithLabelValues(name).Observe(duration.Seconds())
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
componentSignatureKey = "component"
)

// HeaderOption configures the HTTP header.
type HeaderOption func(header http.Header)

// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request.
func WithAllowFollowerHandle() HeaderOption {
return func(header http.Header) {
header.Set("PD-Allow-Follower-Handle", "true")
header.Set(pdAllowFollowerHandleKey, "true")
}
}

Expand All @@ -218,8 +246,8 @@ func (c *client) requestWithRetry(
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
for idx := 0; idx < len(c.inner.pdAddrs); idx++ {
addr = c.inner.pdAddrs[idx]
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...)
if err == nil {
break
Expand All @@ -239,6 +267,8 @@ func (c *client) request(
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", c.callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, body)
Expand All @@ -249,8 +279,10 @@ func (c *client) request(
for _, opt := range headerOpts {
opt(req.Header)
}
req.Header.Set(componentSignatureKey, c.callerID)

start := time.Now()
resp, err := c.cli.Do(req)
resp, err := c.inner.cli.Do(req)
if err != nil {
c.reqCounter(name, networkErrorStatus)
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
Expand Down
73 changes: 73 additions & 0 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"context"
"net/http"
"testing"

"github.com/stretchr/testify/require"
)

// requestChecker is used to check the HTTP request sent by the client.
type requestChecker struct {
checker func(req *http.Request) error
}

// RoundTrip implements the `http.RoundTripper` interface.
func (rc *requestChecker) RoundTrip(req *http.Request) (resp *http.Response, err error) {
return &http.Response{StatusCode: http.StatusOK}, rc.checker(req)
}

func newHTTPClientWithRequestChecker(checker func(req *http.Request) error) *http.Client {
return &http.Client{
Transport: &requestChecker{checker: checker},
}
}

func TestPDAllowFollowerHandleHeader(t *testing.T) {
re := require.New(t)
var expectedVal string
httpClient := newHTTPClientWithRequestChecker(func(req *http.Request) error {
val := req.Header.Get(pdAllowFollowerHandleKey)
if val != expectedVal {
re.Failf("PD allow follower handler header check failed",
"should be %s, but got %s", expectedVal, val)
}
return nil
})
c := NewClient([]string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c.GetRegions(context.Background())
expectedVal = "true"
c.GetHistoryHotRegions(context.Background(), &HistoryHotRegionsRequest{})
}

func TestCallerID(t *testing.T) {
re := require.New(t)
expectedVal := defaultCallerID
httpClient := newHTTPClientWithRequestChecker(func(req *http.Request) error {
val := req.Header.Get(componentSignatureKey)
if val != expectedVal {
re.Failf("Caller ID header check failed",
"should be %s, but got %s", expectedVal, val)
}
return nil
})
c := NewClient([]string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c.GetRegions(context.Background())
expectedVal = "test"
c.WithCallerID(expectedVal).GetRegions(context.Background())
}
2 changes: 1 addition & 1 deletion client/http/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestMergeRegionsInfo(t *testing.T) {
regionsInfo := regionsInfo1.Merge(regionsInfo2)
re.Equal(int64(2), regionsInfo.Count)
re.Equal(2, len(regionsInfo.Regions))
re.Equal(append(regionsInfo1.Regions, regionsInfo2.Regions...), regionsInfo.Regions)
re.Subset(regionsInfo.Regions, append(regionsInfo1.Regions, regionsInfo2.Regions...))
}

func TestRuleStartEndKey(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -526,10 +526,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
return nil, nil, 0, err
return nil, nil, time.Duration(0), 0, err
}
return gc.onRequestWait(ctx, info)
}
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
Expand All @@ -1185,6 +1185,7 @@ func (gc *groupCostController) onRequestWait(
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
var waitDuration time.Duration

if !gc.burstable.Load() {
var err error
Expand Down Expand Up @@ -1217,6 +1218,7 @@ func (gc *groupCostController) onRequestWait(
}
gc.requestRetryCounter.Inc()
time.Sleep(retryInterval)
waitDuration += retryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
Expand All @@ -1226,9 +1228,10 @@ func (gc *groupCostController) onRequestWait(
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- struct{}{}
})
return nil, nil, 0, err
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
Expand All @@ -1245,7 +1248,7 @@ func (gc *groupCostController) onRequestWait(
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, gc.meta.Priority, nil
return delta, penalty, waitDuration, gc.meta.Priority, nil
}

func (gc *groupCostController) onResponse(
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
re.Equal(priority, gc.meta.Priority)
expectedConsumption := &rmpb.Consumption{}
Expand Down
1 change: 1 addition & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
Loading

0 comments on commit 3729fa8

Please sign in to comment.