Skip to content

Commit

Permalink
number of probes supported by the circuit breaker
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 committed Sep 12, 2021
1 parent 671274f commit 93c3872
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
46 changes: 39 additions & 7 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ type circuitBreakerBase struct {
retryTimeoutMs uint32
// nextRetryTimestampMs is the time circuit breaker could probe
nextRetryTimestampMs uint64
// probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
probeNumber uint64
// curProbeNumber is the real-time probe number.
curProbeNumber uint64
// state is the state machine of circuit breaker
state *State
}
Expand All @@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}

func (b *circuitBreakerBase) addCurProbeNum() {
atomic.AddUint64(&b.curProbeNumber, 1)
}

func (b *circuitBreakerBase) resetCurProbeNum() {
atomic.StoreUint64(&b.curProbeNumber, 0)
}

// fromClosedToOpen updates circuit breaker state machine from closed to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
Expand Down Expand Up @@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.state.cas(HalfOpen, Open) {
b.resetCurProbeNum()
b.updateNextRetryTimestamp()
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot)
Expand All @@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.state.cas(HalfOpen, Closed) {
b.resetCurProbeNum()
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, *b.rule)
}
Expand All @@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
stat: stat,
maxAllowedRt: r.MaxAllowedRtMs,
Expand Down Expand Up @@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
// fail to probe
b.fromHalfOpenToOpen(1.0)
} else {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) == b.probeNumber {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
}
}
return
}
Expand Down Expand Up @@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorRatioThreshold: r.Threshold,
Expand Down Expand Up @@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) == b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1.0)
}
Expand Down Expand Up @@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorCountThreshold: uint64(r.Threshold),
Expand Down Expand Up @@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) == b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1)
}
Expand Down
5 changes: 5 additions & 0 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type Rule struct {
// for ErrorRatio, it represents the max error request ratio
// for ErrorCount, it represents the max error request count
Threshold float64 `json:"threshold"`
//ProbeNum is number of probes required when the circuit breaker is half-open.
//when the probe num are set and circuit breaker in the half-open state.
//if err occurs during the probe, the circuit breaker is opened immediately.
//otherwise,the circuit breaker is closed only after the number of probes is reached
ProbeNum uint64 `json:"probeNum"`
}

func (r *Rule) String() string {
Expand Down

0 comments on commit 93c3872

Please sign in to comment.