From 28e78e0030dd8351e8493ab5d586d3e0d2948807 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 10 Dec 2024 14:15:00 +0800 Subject: [PATCH] add a client test for circuit breaker Signed-off-by: Ryan Leung --- client/circuitbreaker/circuit_breaker.go | 25 +++++---- client/circuitbreaker/circuit_breaker_test.go | 21 ++++---- client/client.go | 8 ++- client/metrics/metrics.go | 2 +- tests/integrations/client/client_test.go | 54 +++++++++++++++++++ 5 files changed, 83 insertions(+), 27 deletions(-) diff --git a/client/circuitbreaker/circuit_breaker.go b/client/circuitbreaker/circuit_breaker.go index b5a4c53ebb5..2b67bcdfcf7 100644 --- a/client/circuitbreaker/circuit_breaker.go +++ b/client/circuitbreaker/circuit_breaker.go @@ -19,13 +19,11 @@ import ( "sync" "time" - "github.com/tikv/pd/client/errs" - + "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/client/errs" m "github.com/tikv/pd/client/metrics" "go.uber.org/zap" - - "github.com/pingcap/log" ) // Overloading is a type describing service return value @@ -238,10 +236,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { observedErrorRatePct := s.failureCount * 100 / total if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct { // the error threshold is breached, let's move to open state and start failing all requests - log.Error("Circuit breaker tripped. Starting to fail all requests", + log.Error("circuit breaker tripped and starting to fail all requests", zap.String("name", cb.name), - zap.Uint32("observedErrorRatePct", observedErrorRatePct), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Uint32("observed-err-rate-pct", observedErrorRatePct), + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } } @@ -255,27 +253,28 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { case StateOpen: if now.After(s.end) { // CoolDownInterval is over, it is time to transition to half-open state - log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service", + log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateHalfOpen), nil } else { // continue in the open state till CoolDownInterval is over return s, errs.ErrCircuitBreakerOpen } case StateHalfOpen: + fmt.Println("StateHalfOpen", s.failureCount, s.successCount, s.pendingCount, s.cb.config.HalfOpenSuccessCount) // do we need some expire time here in case of one of pending requests is stuck forever? if s.failureCount > 0 { // there were some failures during half-open state, let's go back to open state to wait a bit longer - log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", + log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } else if s.successCount == s.cb.config.HalfOpenSuccessCount { // all probe requests are succeeded, we can move to closed state and allow all requests - log.Info("Circuit breaker is closed. Start allowing all requests", + log.Info("circuit breaker is closed and start allowing all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateClosed), nil } else if s.pendingCount < s.cb.config.HalfOpenSuccessCount { // allow more probe requests and continue in half-open state diff --git a/client/circuitbreaker/circuit_breaker_test.go b/client/circuitbreaker/circuit_breaker_test.go index ca77b7f9f99..fdbedd8c690 100644 --- a/client/circuitbreaker/circuit_breaker_test.go +++ b/client/circuitbreaker/circuit_breaker_test.go @@ -18,9 +18,8 @@ import ( "testing" "time" - "github.com/tikv/pd/client/errs" - "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/errs" ) // advance emulate the state machine clock moves forward by the given duration @@ -38,7 +37,7 @@ var settings = Settings{ var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds())) -func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { +func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) originalError := errors.New("circuit breaker is open") @@ -57,7 +56,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { re.Equal(42, result) } -func TestCircuitBreaker_OpenState(t *testing.T) { +func TestCircuitBreakerOpenState(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) driveQPS(cb, minCountToOpen, Yes, re) @@ -68,7 +67,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) { re.Equal(StateOpen, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -78,7 +77,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -89,7 +88,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { +func TestCircuitBreakerHalfOpenToClosed(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -107,7 +106,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { +func TestCircuitBreakerHalfOpenToOpen(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -130,7 +129,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { // in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed // this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing -func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { +func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) { re := require.New(t) cb := newCircuitBreakerMovedToHalfOpenState(re) @@ -176,7 +175,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { +func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -211,7 +210,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_ChangeSettings(t *testing.T) { +func TestCircuitBreakerChangeSettings(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings) diff --git a/client/client.go b/client/client.go index c271f10591d..0fb3805d76e 100644 --- a/client/client.go +++ b/client/client.go @@ -22,8 +22,6 @@ import ( "sync" "time" - cb "github.com/tikv/pd/client/circuitbreaker" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -31,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + cb "github.com/tikv/pd/client/circuitbreaker" "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/clients/tso" @@ -42,6 +41,8 @@ import ( "github.com/tikv/pd/client/pkg/utils/tlsutil" sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // GlobalConfigItem standard format of KV pair in GlobalConfig client @@ -660,6 +661,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio } resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) { region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) + failpoint.Inject("triggerCircuitBreaker", func() { + err = status.Error(codes.ResourceExhausted, "resource exhausted") + }) return region, isOverloaded(err), err }) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { diff --git a/client/metrics/metrics.go b/client/metrics/metrics.go index 3a3199c74a6..67268c826f5 100644 --- a/client/metrics/metrics.go +++ b/client/metrics/metrics.go @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) { Namespace: "pd_client", Subsystem: "request", Name: "circuit_breaker_count", - Help: "Circuit Breaker counters", + Help: "Circuit breaker counters", ConstLabels: constLabels, }, []string{"name", "success"}) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 0462a6d9ea0..bdc081ed016 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + cb "github.com/tikv/pd/client/circuitbreaker" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" @@ -2047,3 +2048,56 @@ func needRetry(err error) bool { } return st.Code() == codes.ResourceExhausted } + +func TestCircuitBreaker(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 1, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for i := 0; i < 10; i++ { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for i := 0; i < 100; i++ { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + // wait cooldown + time.Sleep(time.Second) + + for i := 0; i < 10; i++ { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } +}