Skip to content

Commit

Permalink
add a client test for circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 10, 2024
1 parent 1e76110 commit 02f5a36
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 28 deletions.
25 changes: 12 additions & 13 deletions client/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
"sync"
"time"

"github.com/tikv/pd/client/errs"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
m "github.com/tikv/pd/client/metrics"
"go.uber.org/zap"

"github.com/pingcap/log"
)

// Overloading is a type describing service return value
Expand Down Expand Up @@ -238,10 +236,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
observedErrorRatePct := s.failureCount * 100 / total
if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct {
// the error threshold is breached, let's move to open state and start failing all requests
log.Error("Circuit breaker tripped. Starting to fail all requests",
log.Error("circuit breaker tripped and starting to fail all requests",
zap.String("name", cb.name),
zap.Uint32("observedErrorRatePct", observedErrorRatePct),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Uint32("observed-err-rate-pct", observedErrorRatePct),
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
}
}
Expand All @@ -255,27 +253,28 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
case StateOpen:
if now.After(s.end) {
// CoolDownInterval is over, it is time to transition to half-open state
log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateHalfOpen), nil
} else {
// continue in the open state till CoolDownInterval is over
return s, errs.ErrCircuitBreakerOpen
}
case StateHalfOpen:
fmt.Println("StateHalfOpen", s.failureCount, s.successCount, s.pendingCount, s.cb.config.HalfOpenSuccessCount)
// do we need some expire time here in case of one of pending requests is stuck forever?
if s.failureCount > 0 {
// there were some failures during half-open state, let's go back to open state to wait a bit longer
log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
} else if s.successCount == s.cb.config.HalfOpenSuccessCount {
// all probe requests are succeeded, we can move to closed state and allow all requests
log.Info("Circuit breaker is closed. Start allowing all requests",
log.Info("circuit breaker is closed and start allowing all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateClosed), nil
} else if s.pendingCount < s.cb.config.HalfOpenSuccessCount {
// allow more probe requests and continue in half-open state
Expand Down
21 changes: 10 additions & 11 deletions client/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import (
"testing"
"time"

"github.com/tikv/pd/client/errs"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

// advance emulate the state machine clock moves forward by the given duration
Expand All @@ -38,7 +37,7 @@ var settings = Settings{

var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds()))

func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
originalError := errors.New("circuit breaker is open")
Expand All @@ -57,7 +56,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
re.Equal(42, result)
}

func TestCircuitBreaker_OpenState(t *testing.T) {
func TestCircuitBreakerOpenState(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
driveQPS(cb, minCountToOpen, Yes, re)
Expand All @@ -68,7 +67,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) {
re.Equal(StateOpen, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -78,7 +77,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -89,7 +88,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
func TestCircuitBreakerHalfOpenToClosed(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -107,7 +106,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {
func TestCircuitBreakerHalfOpenToOpen(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -130,7 +129,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {

// in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed
// this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing
func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) {
re := require.New(t)
cb := newCircuitBreakerMovedToHalfOpenState(re)

Expand Down Expand Up @@ -176,7 +175,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand Down Expand Up @@ -211,7 +210,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_ChangeSettings(t *testing.T) {
func TestCircuitBreakerChangeSettings(t *testing.T) {
re := require.New(t)

cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings)
Expand Down
8 changes: 6 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"sync"
"time"

cb "github.com/tikv/pd/client/circuitbreaker"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
cb "github.com/tikv/pd/client/circuitbreaker"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
Expand All @@ -42,6 +41,8 @@ import (
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// GlobalConfigItem standard format of KV pair in GlobalConfig client
Expand Down Expand Up @@ -660,6 +661,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
Expand Down
2 changes: 1 addition & 1 deletion client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) {
Namespace: "pd_client",
Subsystem: "request",
Name: "circuit_breaker_count",
Help: "Circuit Breaker counters",
Help: "Circuit breaker counters",
ConstLabels: constLabels,
}, []string{"name", "success"})
}
Expand Down
56 changes: 55 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
pd "github.com/tikv/pd/client" // 导入 circuitbreaker 包
cb "github.com/tikv/pd/client/circuitbreaker"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
Expand Down Expand Up @@ -2047,3 +2048,56 @@ func needRetry(err error) bool {
}
return st.Code() == codes.ResourceExhausted
}

func TestCircuitBreaker(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for i := 0; i < 10; i++ {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for i := 0; i < 100; i++ {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

// wait cooldown
time.Sleep(time.Second)

for i := 0; i < 10; i++ {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}
}

0 comments on commit 02f5a36

Please sign in to comment.