Skip to content

Commit

Permalink
Merge branch 'master' into fix_handle_msg_send
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Apr 11, 2024
2 parents 9635e67 + 67461dd commit 18e96a6
Show file tree
Hide file tree
Showing 80 changed files with 2,118 additions and 482 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
TOTAL_JOBS: ${{needs.chunks.outputs.job-total}}
run: for i in $(seq 1 $TOTAL_JOBS); do cat covprofile_$i >> covprofile; done
- name: Send coverage
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v4.2.0
with:
token: ${{ secrets.CODECOV }}
file: ./covprofile
Expand Down
18 changes: 15 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ BUILD_BIN_PATH := $(ROOT_PATH)/bin

build: pd-server pd-ctl pd-recover

tools: pd-tso-bench pd-heartbeat-bench regions-dump stores-dump pd-api-bench
tools: pd-tso-bench pd-heartbeat-bench regions-dump stores-dump pd-api-bench pd-ut

PD_SERVER_DEP :=
ifeq ($(SWAGGER), 1)
Expand Down Expand Up @@ -108,7 +108,6 @@ pd-server-basic:
.PHONY: pre-build build tools pd-server pd-server-basic

# Tools

pd-ctl:
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ctl pd-ctl/main.go
pd-tso-bench:
Expand All @@ -127,8 +126,12 @@ regions-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump stores-dump/main.go
pd-ut: pd-xprog
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ut pd-ut/ut.go
pd-xprog:
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -tags xprog -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/xprog pd-ut/xprog.go

.PHONY: pd-ctl pd-tso-bench pd-recover pd-analysis pd-heartbeat-bench simulator regions-dump stores-dump pd-api-bench
.PHONY: pd-ctl pd-tso-bench pd-recover pd-analysis pd-heartbeat-bench simulator regions-dump stores-dump pd-api-bench pd-ut

#### Docker image ####

Expand Down Expand Up @@ -225,6 +228,12 @@ failpoint-disable: install-tools

#### Test ####

ut: pd-ut
@$(FAILPOINT_ENABLE)
./bin/pd-ut run --race
@$(CLEAN_UT_BINARY)
@$(FAILPOINT_DISABLE)

PACKAGE_DIRECTORIES := $(subst $(PD_PKG)/,,$(PACKAGES))
TEST_PKGS := $(filter $(shell find . -iname "*_test.go" -exec dirname {} \; | \
sort -u | sed -e "s/^\./github.com\/tikv\/pd/"),$(PACKAGES))
Expand Down Expand Up @@ -303,13 +312,16 @@ split:

clean: failpoint-disable clean-test clean-build

CLEAN_UT_BINARY := find . -name '*.test.bin'| xargs rm -f

clean-test:
# Cleaning test tmp...
rm -rf /tmp/test_pd*
rm -rf /tmp/pd-tests*
rm -rf /tmp/test_etcd*
rm -f $(REAL_CLUSTER_TEST_PATH)/playground.log
go clean -testcache
@$(CLEAN_UT_BINARY)

clean-build:
# Cleaning building files...
Expand Down
22 changes: 16 additions & 6 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin
ROOT_PATH := $(shell pwd)/..
GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

default: static tidy test

test:
CGO_ENABLE=1 go test ./... -race -cover
test: failpoint-enable
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

basic-test:
CGO_ENABLE=1 go test ./...
basic-test: failpoint-enable
CGO_ENABLED=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

ci-test-job:
CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client
if [ -f covprofile ]; then rm covprofile; fi
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../...

failpoint-enable:
cd $(ROOT_PATH) && $(MAKE) failpoint-enable

failpoint-disable:
cd $(ROOT_PATH) && $(MAKE) failpoint-disable

install-tools:
cd .. && $(MAKE) install-tools
Expand Down
33 changes: 20 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,10 @@ type GlobalConfigItem struct {
PayLoad []byte
}

// Client is a PD (Placement Driver) RPC client.
// It should not be used after calling Close().
type Client interface {
// GetClusterID gets the cluster ID from PD.
GetClusterID(ctx context.Context) uint64
// RPCClient is a PD (Placement Driver) RPC and related mcs client which can only call RPC.
type RPCClient interface {
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetLeaderURL returns current leader's URL. It returns "" before
// syncing leader from server.
GetLeaderURL() string
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
Expand Down Expand Up @@ -133,17 +127,12 @@ type Client interface {
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns a stream with all global config and updates
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value any) error

// GetExternalTimestamp returns external timestamp
GetExternalTimestamp(ctx context.Context) (uint64, error)
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
Expand All @@ -154,6 +143,24 @@ type Client interface {
GCClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
}

// Client is a PD (Placement Driver) RPC client.
// It should not be used after calling Close().
type Client interface {
RPCClient

// GetClusterID gets the cluster ID from PD.
GetClusterID(ctx context.Context) uint64
// GetLeaderURL returns current leader's URL. It returns "" before
// syncing leader from server.
GetLeaderURL() string
// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery

// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value any) error

// Close closes the client.
Close()
}
Expand Down
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
var (
ErrClientListResourceGroup = errors.Normalize("get all resource group failed, %v", errors.RFCCodeText("PD:client:ErrClientListResourceGroup"))
ErrClientResourceGroupConfigUnavailable = errors.Normalize("resource group config is unavailable, %v", errors.RFCCodeText("PD:client:ErrClientResourceGroupConfigUnavailable"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation, estimated wait time %s, ltb state is %.2f:%.2f", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
)

// ErrClientGetResourceGroup is the error type for getting resource group.
Expand Down
16 changes: 16 additions & 0 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = 30 * time.Second
// defaultWaitRetryTimes is the times to retry when waiting for the token.
defaultWaitRetryTimes = 10
// defaultWaitRetryInterval is the interval to retry when waiting for the token.
defaultWaitRetryInterval = 50 * time.Millisecond
)

const (
Expand Down Expand Up @@ -85,6 +89,12 @@ type Config struct {
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// WaitRetryInterval is the interval to retry when waiting for the token.
WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`

// WaitRetryTimes is the times to retry when waiting for the token.
WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
Expand All @@ -98,6 +108,8 @@ func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
WaitRetryInterval: NewDuration(defaultWaitRetryInterval),
WaitRetryTimes: defaultWaitRetryTimes,
RequestUnit: DefaultRequestUnitConfig(),
EnableControllerTraceLog: false,
}
Expand Down Expand Up @@ -155,6 +167,8 @@ type RUConfig struct {

// some config for client
LTBMaxWaitDuration time.Duration
WaitRetryInterval time.Duration
WaitRetryTimes int
DegradedModeWaitDuration time.Duration
}

Expand All @@ -176,6 +190,8 @@ func GenerateRUConfig(config *Config) *RUConfig {
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
WaitRetryInterval: config.WaitRetryInterval.Duration,
WaitRetryTimes: config.WaitRetryTimes,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
}
26 changes: 19 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (

const (
controllerConfigPath = "resource_group/controller"
maxRetry = 10
retryInterval = 50 * time.Millisecond
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond
Expand Down Expand Up @@ -105,6 +103,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
}
}

// WithWaitRetryInterval is the option to set the retry interval when waiting for the token.
func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.WaitRetryInterval = d
}
}

// WithWaitRetryTimes is the option to set the times to retry when waiting for the token.
func WithWaitRetryTimes(times int) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.WaitRetryTimes = times
}
}

var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)

// ResourceGroupsController implements ResourceGroupKVInterceptor.
Expand Down Expand Up @@ -186,7 +198,7 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
config := DefaultConfig()
err = json.Unmarshal(kvs[0].GetValue(), config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -367,7 +379,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
config := &Config{}
config := DefaultConfig()
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue
}
Expand Down Expand Up @@ -1206,7 +1218,7 @@ func (gc *groupCostController) onRequestWait(
var i int
var d time.Duration
retryLoop:
for i = 0; i < maxRetry; i++ {
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
Expand All @@ -1230,8 +1242,8 @@ func (gc *groupCostController) onRequestWait(
}
}
gc.requestRetryCounter.Inc()
time.Sleep(retryInterval)
waitDuration += retryInterval
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
Expand Down
26 changes: 15 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDuration time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
limit Limit
remainingTokens float64
}

// OK returns whether the limiter can provide the requested number of tokens
Expand Down Expand Up @@ -359,10 +360,11 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDuration: waitDuration,
remainingTokens: tokens,
}
if ok {
r.tokens = n
Expand All @@ -380,6 +382,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
Expand Down Expand Up @@ -461,7 +465,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens)
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestCancel(t *testing.T) {
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(4*time.Second, d)
re.Error(err)
re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00")
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
cancel1()
Expand Down
2 changes: 0 additions & 2 deletions client/resource_group/controller/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type example struct {
}

func TestDurationJSON(t *testing.T) {
t.Parallel()
re := require.New(t)
example := &example{}

Expand All @@ -41,7 +40,6 @@ func TestDurationJSON(t *testing.T) {
}

func TestDurationTOML(t *testing.T) {
t.Parallel()
re := require.New(t)
example := &example{}

Expand Down
Loading

0 comments on commit 18e96a6

Please sign in to comment.