Skip to content

Commit

Permalink
Merge branch 'master' into fix-limit-timefallback
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 24, 2024
2 parents b209858 + 0c56739 commit 83023f6
Show file tree
Hide file tree
Showing 70 changed files with 1,394 additions and 652 deletions.
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -231,6 +232,11 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -1193,10 +1199,11 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
ContainAllKeyRange: options.outputMustContainAllKeyRange,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 6 additions & 2 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ var (
cmdFailedDurationUpdateServiceGCSafePoint prometheus.Observer
cmdFailedDurationLoadKeyspace prometheus.Observer
cmdFailedDurationUpdateKeyspaceState prometheus.Observer
requestDurationTSO prometheus.Observer
cmdFailedDurationGet prometheus.Observer
cmdFailedDurationPut prometheus.Observer
cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer

requestDurationTSO prometheus.Observer
requestFailedDurationTSO prometheus.Observer
)

func initCmdDurations() {
Expand Down Expand Up @@ -207,11 +209,13 @@ func initCmdDurations() {
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
cmdFailedDurationLoadKeyspace = cmdFailedDuration.WithLabelValues("load_keyspace")
cmdFailedDurationUpdateKeyspaceState = cmdFailedDuration.WithLabelValues("update_keyspace_state")
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get")
cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put")
cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")

requestDurationTSO = requestDuration.WithLabelValues("tso")
requestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed")
}

func registerMetrics() {
Expand Down
123 changes: 84 additions & 39 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,33 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if gc, ok := c.loadGroupController(group.Name); ok {
name := group.GetName()
gc, ok := c.loadGroupController(name)
if !ok {
continue
}
if !gc.tombstone.Load() {
gc.modifyMeta(group)
// If the resource group is marked as tombstone before, set it as active again.
if swapped := gc.tombstone.CompareAndSwap(true, false); swapped {
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name))
}
continue
}
// If the resource group is marked as tombstone before, re-create the resource group controller.
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
continue
}
if c.groupsController.CompareAndSwap(name, gc, newGC) {
log.Info("[resource group controller] re-create resource group cost controller for tombstone",
zap.String("name", name))
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
if gc, ok := c.loadGroupController(group.Name); ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
if item.PrevKv == nil {
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
continue
}
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
c.tombstoneGroupCostController(group.GetName())
}
}
case resp, ok := <-watchConfigChannel:
Expand Down Expand Up @@ -446,15 +451,23 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist.
// It's exported for testing.
func NewResourceGroupNotExistErr(name string) error {
return errors.Errorf("%s does not exist", name)
}

// tryGetResourceGroupController will try to get the resource group controller from local cache first.
// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server.
// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone.
func (c *ResourceGroupsController) tryGetResourceGroupController(
ctx context.Context, name string, useTombstone bool,
) (*groupCostController, error) {
// Get from the local cache first.
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if !useTombstone && gc.tombstone.Load() {
return nil, NewResourceGroupNotExistErr(name)
}
return gc, nil
}
Expand All @@ -464,7 +477,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
if group == nil {
return nil, errors.Errorf("%s does not exists", name)
return nil, NewResourceGroupNotExistErr(name)
}
// Check again to prevent initializing the same resource group concurrently.
if gc, ok = c.loadGroupController(name); ok {
Expand All @@ -476,14 +489,51 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, err
}
// Check again to prevent initializing the same resource group concurrently.
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
gc, loaded := c.loadOrStoreGroupController(name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
log.Info("[resource group controller] create resource group cost controller", zap.String("name", name))
}
return gc, nil
}

// Do not delete the resource group immediately to prevent from interrupting the ongoing request,
// mark it as tombstone and create a default resource group controller for it.
func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
_, ok := c.loadGroupController(name)
if !ok {
return
}
// The default resource group controller should never be deleted.
if name == defaultResourceGroupName {
return
}
// Try to get the default group meta first.
defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, false)
if err != nil || defaultGC == nil {
log.Warn("[resource group controller] get default resource group meta for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group is not available.
c.groupsController.Delete(name)
return
}
// Create a default resource group controller for the tombstone resource group independently.
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] create default resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group controller cannot be created.
c.groupsController.Delete(name)
return
}
gc.tombstone.Store(true)
c.groupsController.Store(name, gc)
// Its metrics will be deleted in the cleanup process.
resourceGroupStatusGauge.WithLabelValues(name, name).Set(2)
log.Info("[resource group controller] default resource group controller cost created for tombstone",
zap.String("name", name))
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
c.groupsController.Range(func(key, value any) bool {
resourceGroupName := key.(string)
Expand All @@ -496,7 +546,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true
}
gc.inactive = true
Expand Down Expand Up @@ -589,7 +638,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true)
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
Expand All @@ -605,17 +654,13 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -626,7 +671,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -646,7 +691,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context,

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false)
if err != nil {
return nil, err
}
Expand Down
60 changes: 36 additions & 24 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
c1, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

c2, err := controller.tryGetResourceGroup(ctx, "test-group")
c2, err := controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c2.meta)

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
}
}

func TestGetController(t *testing.T) {
func TestTryGetController(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -286,39 +286,51 @@ func TestGetController(t *testing.T) {
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil)

c, err := controller.GetResourceGroup("test-group-non-existent")
gc, err := controller.tryGetResourceGroupController(ctx, "test-group-non-existent", false)
re.Error(err)
re.Nil(c)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
c, err = controller.GetResourceGroup("test-group")
re.Equal(defaultResourceGroup, gc.getMeta())
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
re.Equal(testResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.Equal(testResourceGroup, gc.getMeta())
requestInfo, responseInfo := NewTestRequestInfo(true, 1, 1), NewTestResponseInfo(1, time.Millisecond, true)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
consumption, err := controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
// Mark the tombstone manually to test the fallback case.
gc, err := controller.tryGetResourceGroup(ctx, "test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup("test-group")
re.NotNil(gc)
controller.tombstoneGroupCostController("test-group")
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false)
re.Error(err)
re.Nil(gc)
gc, err = controller.tryGetResourceGroupController(ctx, "test-group", true)
re.NoError(err)
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
consumption, err = controller.OnResponse("test-group", requestInfo, responseInfo)
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
re.NotEmpty(consumption)
// Test the default group protection.
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
// Mark the default group tombstone manually to test the fallback case.
gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
re.Equal(defaultResourceGroup, gc.getMeta())
controller.tombstoneGroupCostController(defaultResourceGroupName)
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.Equal(defaultResourceGroup, gc.getMeta())
gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, true)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{})
re.Equal(defaultResourceGroup, gc.getMeta())
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, requestInfo)
re.NoError(err)
_, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{})
consumption, err = controller.OnResponse(defaultResourceGroupName, requestInfo, responseInfo)
re.NoError(err)
re.NotEmpty(consumption)
}
Loading

0 comments on commit 83023f6

Please sign in to comment.