diff --git a/README.md b/README.md index 521ddbf..c5213f2 100644 --- a/README.md +++ b/README.md @@ -316,7 +316,24 @@ Example: } } ``` +##### Degradation: Category=degradation +[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30) + +|Variable|Introduction| +|------------|----| +| percentage | The percentage of dropped requests| + +Example: +default configuration(false,0) +```json +{ + "degradation": { + "enabled": true, + "percentage": 30 + } +} +``` Note: The circuit breaker implementation of kitex does not currently support changing the global default configuration (see [initServiceCB](https://github.com/cloudwego/kitex/blob/v0.5.1/pkg/circuitbreak/cbsuite.go#L195) for details). ### More Info @@ -374,6 +391,10 @@ For client configuration, you should write all their configurations in the same } } } + }, + "degradation": { + "enabled": true, + "percentage": 30 } } } diff --git a/README_CN.md b/README_CN.md index efa16c0..27fb08e 100644 --- a/README_CN.md +++ b/README_CN.md @@ -315,6 +315,25 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认 } ``` 注:kitex 的熔断实现目前不支持修改全局默认配置(详见 [initServiceCB](https://github.com/cloudwego/kitex/blob/v0.5.1/pkg/circuitbreak/cbsuite.go#L195)) + +##### 降级: Category=degradation + +[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30) + +|参数|说明| +|----|----| +| percentage | 丢弃比例 | + +样例: +默认配置(false,0) +```json +{ + "degradation": { + "enabled": true, + "percentage": 30 + } +} +``` ### 更多信息 更多示例请参考 [example](https://github.com/kitex-contrib/config-file/tree/main/example) @@ -374,6 +393,10 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认 } } } + }, + "degradation": { + "enabled": true, + "percentage": 30 } } } diff --git a/client/degradation.go b/client/degradation.go new file mode 100644 index 0000000..7afe268 --- /dev/null +++ b/client/degradation.go @@ -0,0 +1,45 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + kitexclient "github.com/cloudwego/kitex/client" + "github.com/kitex-contrib/config-file/monitor" + degradation "github.com/kitex-contrib/config-file/pkg" +) + +func WithDegradation(watcher monitor.ConfigMonitor) []kitexclient.Option { + container, keyDegradation := initDegradationOptions(watcher) + return []kitexclient.Option{ + kitexclient.WithACLRules(container.GetAclRule()), + kitexclient.WithCloseCallbacks(func() error { + watcher.DeregisterCallback(keyDegradation) + return nil + }), + } +} + +func initDegradationOptions(watcher monitor.ConfigMonitor) (*degradation.Container, int64) { + degradationContainer := degradation.NewContainer() + onChangeCallback := func() { + config := getFileConfig(watcher) + if config == nil { + return // config is nil, do nothing, log will be printed in getFileConfig + } + degradationContainer.NotifyPolicyChange(config.Degradation) + } + keyDegradation := watcher.RegisterCallback(onChangeCallback) + return degradationContainer, keyDegradation +} diff --git a/client/suite.go b/client/suite.go index 9878870..7860bee 100644 --- a/client/suite.go +++ b/client/suite.go @@ -48,6 +48,7 @@ func (s *FileConfigClientSuite) Options() []kitexclient.Option { opts = append(opts, WithRetryPolicy(s.watcher)...) opts = append(opts, WithCircuitBreaker(s.service, s.watcher)...) opts = append(opts, WithRPCTimeout(s.watcher)...) + opts = append(opts, WithDegradation(s.watcher)...) opts = append(opts, kitexclient.WithCloseCallbacks(func() error { s.watcher.Stop() return nil diff --git a/example/client/kitex_client.json b/example/client/kitex_client.json index 1d36c0f..8c978df 100644 --- a/example/client/kitex_client.json +++ b/example/client/kitex_client.json @@ -41,6 +41,10 @@ } } } + }, + "degradation": { + "enabled": true, + "percentage": 0 } } } \ No newline at end of file diff --git a/parser/client.go b/parser/client.go index 7b472bb..83f786d 100644 --- a/parser/client.go +++ b/parser/client.go @@ -18,13 +18,15 @@ import ( "github.com/cloudwego/kitex/pkg/circuitbreak" "github.com/cloudwego/kitex/pkg/retry" "github.com/cloudwego/kitex/pkg/rpctimeout" + degradation "github.com/kitex-contrib/config-file/pkg" ) // ClientFileConfig is config of a client/service pair type ClientFileConfig struct { - Timeout map[string]*rpctimeout.RPCTimeout `mapstructure:"timeout"` // key: method, "*" for default - Retry map[string]*retry.Policy `mapstructure:"retry"` // key: method, "*" for default - Circuitbreaker map[string]*circuitbreak.CBConfig `mapstructure:"circuitbreaker"` // key: method + Timeout map[string]*rpctimeout.RPCTimeout `mapstructure:"timeout"` // key: method, "*" for default + Retry map[string]*retry.Policy `mapstructure:"retry"` // key: method, "*" for default + Circuitbreaker map[string]*circuitbreak.CBConfig `mapstructure:"circuitbreaker"` // key: method + Degradation map[string]*degradation.DegradationConfig `mapstructure:"degradation"` } // ClientFileManager is a map of client/service pairs to ClientFileConfig diff --git a/pkg/degradation.go b/pkg/degradation.go new file mode 100644 index 0000000..3da5a63 --- /dev/null +++ b/pkg/degradation.go @@ -0,0 +1,93 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/bytedance/gopkg/lang/fastrand" + "github.com/cloudwego/configmanager/iface" + "github.com/cloudwego/kitex/pkg/acl" +) + +var errRejected = errors.New("rejected by client degradation config") + +var defaultConfig = &DegradationConfig{ + Enable: false, + Percentage: 0, +} + +type DegradationConfig struct { + Enable bool `json:"enabled"` + Percentage int `json:"percentage"` +} + +// DeepCopy returns a copy of the current Config +func (c *DegradationConfig) DeepCopy() iface.ConfigValueItem { + result := &DegradationConfig{ + Enable: c.Enable, + Percentage: c.Percentage, + } + return result +} + +// EqualsTo returns true if the current Config equals to the other Config +func (c *DegradationConfig) EqualsTo(other iface.ConfigValueItem) bool { + o, ok := other.(*DegradationConfig) + if !ok { + return false + } + if c == nil { + return o == nil + } + return c.Enable == o.Enable && c.Percentage == o.Percentage +} + +// Container is a wrapper for Config +type Container struct { + config atomic.Value +} + +func NewContainer() *Container { + c := &Container{} + c.config.Store(defaultConfig) + return c +} + +// NotifyPolicyChange to receive policy when it changes +func (c *Container) NotifyPolicyChange(cfg map[string]*DegradationConfig) { + for _, value := range cfg { + newConfig := &DegradationConfig{ + Enable: value.Enable, + Percentage: value.Percentage, + } + c.config.Store(newConfig) + } +} + +func (c *Container) GetAclRule() acl.RejectFunc { + return func(ctx context.Context, request interface{}) (reason error) { + cfg := c.config.Load().(*DegradationConfig) + if !cfg.Enable { + return nil + } + if fastrand.Intn(100) < cfg.Percentage { + return errRejected + } + return nil + } +} diff --git a/pkg/degradation_test.go b/pkg/degradation_test.go new file mode 100644 index 0000000..1b65bac --- /dev/null +++ b/pkg/degradation_test.go @@ -0,0 +1,46 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "testing" + + "github.com/cloudwego/kitex/pkg/acl" + "github.com/cloudwego/thriftgo/pkg/test" +) + +var errFake = errors.New("fake error") + +func invoke(ctx context.Context, request, response interface{}) error { + return errFake +} + +func TestNewContainer(t *testing.T) { + container := NewContainer() + cfg1 := map[string]*DegradationConfig{ + "someMethod": {Enable: false, Percentage: 100}, // Example method and config + } + cfg2 := map[string]*DegradationConfig{ + "someMethod": {Enable: true, Percentage: 100}, // Example method and config + } + aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetAclRule()}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(cfg1) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(cfg2) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errRejected)) +}