Skip to content

Commit

Permalink
open immediately if error rate equals to 0
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 10, 2024
1 parent 28e78e0 commit c1a9697
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
10 changes: 9 additions & 1 deletion client/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
defer cb.mutex.Unlock()

apply(cb.config)
log.Info("circuit breaker settings changed", zap.Any("config", cb.config))
}

// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
Expand Down Expand Up @@ -251,6 +252,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
// continue in closed state till ErrorRateWindow is over
return s, nil
case StateOpen:
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

if now.After(s.end) {
// CoolDownInterval is over, it is time to transition to half-open state
log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
Expand All @@ -262,7 +267,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
return s, errs.ErrCircuitBreakerOpen
}
case StateHalfOpen:
fmt.Println("StateHalfOpen", s.failureCount, s.successCount, s.pendingCount, s.cb.config.HalfOpenSuccessCount)
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

// do we need some expire time here in case of one of pending requests is stuck forever?
if s.failureCount > 0 {
// there were some failures during half-open state, let's go back to open state to wait a bit longer
Expand Down
48 changes: 48 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,3 +2101,51 @@ func TestCircuitBreaker(t *testing.T) {
re.NotNil(region)
}
}

func TestCircuitBreakerChangeSettings(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for i := 0; i < 10; i++ {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for i := 0; i < 100; i++ {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) {
*config = cb.AlwaysClosedSettings
})

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "ResourceExhausted")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
}

0 comments on commit c1a9697

Please sign in to comment.