Skip to content

Commit

Permalink
Fix potential nil counter bug in currentBucketOfTime of leap array (#327
Browse files Browse the repository at this point in the history
)

* If the sample count is 1 in leapArray, we guarantee `currentBucketOfTime` returns the bucket.
* Also refine method signature of getting counters in circuit breaker modules.
  • Loading branch information
louyuting authored Nov 23, 2020
1 parent 8d33c7c commit 3a70369
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
59 changes: 33 additions & 26 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,15 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
return false
}

func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, err error) {
func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
// add slow and add total
metricStat := b.stat
counter := metricStat.currentCounter()
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in slowRtCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if rt > b.maxAllowedRt {
atomic.AddUint64(&counter.slowCount, 1)
}
Expand Down Expand Up @@ -339,27 +344,23 @@ func (s *slowRequestLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime uin
return bw
}

func (s *slowRequestLeapArray) currentCounter() *slowRequestCounter {
func (s *slowRequestLeapArray) currentCounter() (*slowRequestCounter, error) {
curBucket, err := s.data.CurrentBucket(s)
if err != nil {
logging.Error(err, "Failed to get current bucket in slowRequestLeapArray.currentCounter()")
return nil
return nil, err
}
if curBucket == nil {
logging.Error(errors.New("nil current BucketWrap"), "Current bucket is nil in slowRequestLeapArray.currentCounter()")
return nil
return nil, errors.New("nil BucketWrap")
}
mb := curBucket.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket atomic Value is nil"), "Current bucket atomic Value is nil in slowRequestLeapArray.currentCounter()")
return nil
return nil, errors.New("nil slowRequestCounter")
}
counter, ok := mb.(*slowRequestCounter)
if !ok {
logging.Error(errors.New("bucket data type error"), "Bucket data type error in slowRequestLeapArray.currentCounter()", "expect type", "*slowRequestCounter", "actual type", reflect.TypeOf(mb).Name())
return nil
return nil, errors.Errorf("bucket fail to do type assert, expect: *slowRequestCounter, in fact: %s", reflect.TypeOf(mb).Name())
}
return counter
return counter, nil
}

func (s *slowRequestLeapArray) allCounter() []*slowRequestCounter {
Expand Down Expand Up @@ -432,9 +433,14 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
return false
}

func (b *errorRatioCircuitBreaker) OnRequestComplete(rt uint64, err error) {
func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in errorRatioCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if err != nil {
atomic.AddUint64(&counter.errorCount, 1)
}
Expand Down Expand Up @@ -516,27 +522,23 @@ func (s *errorCounterLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime ui
return bw
}

func (s *errorCounterLeapArray) currentCounter() *errorCounter {
func (s *errorCounterLeapArray) currentCounter() (*errorCounter, error) {
curBucket, err := s.data.CurrentBucket(s)
if err != nil {
logging.Error(err, "Failed to get current bucket in errorCounterLeapArray.currentCounter()")
return nil
return nil, err
}
if curBucket == nil {
logging.Error(errors.New("current bucket is nil"), "Current bucket is nil in errorCounterLeapArray.currentCounter()")
return nil
return nil, errors.New("nil BucketWrap")
}
mb := curBucket.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket atomic Value is nil"), "Current bucket atomic Value is nil in errorCounterLeapArray.currentCounter()")
return nil
return nil, errors.New("nil errorCounter")
}
counter, ok := mb.(*errorCounter)
if !ok {
logging.Error(errors.New("bucket data type error"), "Bucket data type error in errorCounterLeapArray.currentCounter()", "expect type", "*errorCounter", "actual type", reflect.TypeOf(mb).Name())
return nil
return nil, errors.Errorf("bucket fail to do type assert, expect: *errorCounter, in fact: %s", reflect.TypeOf(mb).Name())
}
return counter
return counter, nil
}

func (s *errorCounterLeapArray) allCounter() []*errorCounter {
Expand Down Expand Up @@ -609,9 +611,14 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
return false
}

func (b *errorCountCircuitBreaker) OnRequestComplete(rt uint64, err error) {
func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
metricStat := b.stat
counter := metricStat.currentCounter()
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in errorCountCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if err != nil {
atomic.AddUint64(&counter.errorCount, 1)
}
Expand Down
4 changes: 4 additions & 0 deletions core/stat/base/leap_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*Bucke
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
if la.sampleCount == 1 {
// if sampleCount==1 in leap array, in concurrency scenario, this case is possible
return old, nil
}
// TODO: reserve for some special case (e.g. when occupying "future" buckets).
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
Expand Down

0 comments on commit 3a70369

Please sign in to comment.