Skip to content

Commit

Permalink
Removed global lock from breaker call
Browse files Browse the repository at this point in the history
The global lock was causing performance issues in multithreaded
environments. This change applies the lock in a much more fine-grained
way to solve this issue.
  • Loading branch information
nielsdraaisma authored and Timvde committed Aug 31, 2023
1 parent ccbab3a commit c4db966
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 50 deletions.
44 changes: 26 additions & 18 deletions src/pybreaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def fail_counter(self) -> int:
"""
Returns the current number of consecutive failures.
"""
return self._state_storage.counter
with self._lock:
return self._state_storage.counter

@property
def fail_max(self) -> int:
Expand Down Expand Up @@ -201,7 +202,8 @@ def current_state(self) -> str:
Returns a string that identifies the state of the circuit breaker as
reported by the _state_storage. i.e., 'closed', 'open', 'half-open'.
"""
return self._state_storage.state
with self._lock:
return self._state_storage.state

@property
def excluded_exceptions(self) -> Tuple[Type[ExceptionType], ...]:
Expand Down Expand Up @@ -236,7 +238,8 @@ def _inc_counter(self) -> None:
"""
Increments the counter of failed calls.
"""
self._state_storage.increment_counter()
with self._lock:
self._state_storage.increment_counter()

def is_system_error(self, exception: ExceptionType) -> bool:
"""
Expand All @@ -259,8 +262,7 @@ def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
Calls `func` with the given `args` and `kwargs` according to the rules
implemented by the current state of this circuit breaker.
"""
with self._lock:
return self.state.call(func, *args, **kwargs)
return self.state.call(func, *args, **kwargs)

def call_async(self, func, *args, **kwargs): # type: ignore[no-untyped-def]
"""
Expand All @@ -272,9 +274,8 @@ def call_async(self, func, *args, **kwargs): # type: ignore[no-untyped-def]

@gen.coroutine
def wrapped(): # type: ignore[no-untyped-def]
with self._lock:
ret = yield self.state.call_async(func, *args, **kwargs)
raise gen.Return(ret)
ret = yield self.state.call_async(func, *args, **kwargs)
raise gen.Return(ret)

return wrapped()

Expand Down Expand Up @@ -441,6 +442,7 @@ def __init__(self, name: str) -> None:
Creates a new instance identified by `name`.
"""
self._name = name
self._count_lock = threading.RLock()

@property
def name(self) -> str:
Expand Down Expand Up @@ -527,20 +529,23 @@ def increment_counter(self) -> None:
"""
Increases the failure counter by one.
"""
self._fail_counter += 1
with self._count_lock:
self._fail_counter += 1

def reset_counter(self) -> None:
"""
Sets the failure counter to zero.
"""
self._fail_counter = 0
with self._count_lock:
self._fail_counter = 0

@property
def counter(self) -> int:
"""
Returns the current value of the failure counter.
"""
return self._fail_counter
with self._count_lock:
return self._fail_counter

@property
def opened_at(self) -> datetime | None:
Expand Down Expand Up @@ -643,7 +648,8 @@ def increment_counter(self) -> None:
Increases the failure counter by one.
"""
try:
self._redis.incr(self._namespace("fail_counter"))
with self._count_lock:
self._redis.incr(self._namespace("fail_counter"))
except RedisError:
self.logger.error("RedisError", exc_info=True)

Expand All @@ -652,7 +658,8 @@ def reset_counter(self) -> None:
Sets the failure counter to zero.
"""
try:
self._redis.set(self._namespace("fail_counter"), 0)
with self._count_lock:
self._redis.set(self._namespace("fail_counter"), 0)
except RedisError:
self.logger.error("RedisError", exc_info=True)

Expand All @@ -662,11 +669,12 @@ def counter(self) -> int:
Returns the current value of the failure counter.
"""
try:
value = self._redis.get(self._namespace("fail_counter"))
if value:
return int(value)
else:
return 0
with self._count_lock:
value = self._redis.get(self._namespace("fail_counter"))
if value:
return int(value)
else:
return 0
except RedisError:
self.logger.error("RedisError: Assuming no errors", exc_info=True)
return 0
Expand Down
56 changes: 24 additions & 32 deletions src/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
from pybreaker import *


class SuccessListener(CircuitBreakerListener):
def __init__(self):
self.success_count = 0
self.success_count_lock = threading.RLock()

def success(self, cb):
with self.success_count_lock:
sleep(0.00005)
self.success_count = self.success_count + 1


class CircuitBreakerStorageBasedTestCase(object):
"""
Mix in to test against different storage backings. Depends on
Expand Down Expand Up @@ -772,6 +783,7 @@ def test_state_opened_at_not_reset_during_creation(self):

import logging


class CircuitBreakerMultiplexerTestCase(unittest.TestCase):
"""
Tests for the CircuitBreakerMultiplexer class.
Expand All @@ -781,7 +793,7 @@ def setUp(self):
self.breaker = CircuitBreakerMultiplexer(fail_max=1)

def test_positional_argument(self):
@self.breaker(break_on='arg')
@self.breaker(break_on="arg")
def func(arg):
if arg == 1:
raise Exception
Expand All @@ -794,7 +806,7 @@ def func(arg):
self.assertEqual(func(2), True)

def test_keyword_argument(self):
@self.breaker(break_on='arg')
@self.breaker(break_on="arg")
def func(arg):
if arg == 1:
raise Exception
Expand All @@ -807,7 +819,7 @@ def func(arg):
self.assertEqual(func(arg=2), True)

def test_default_argument(self):
@self.breaker(break_on='arg')
@self.breaker(break_on="arg")
def func(arg=1):
if arg == 1:
raise Exception
Expand All @@ -820,7 +832,7 @@ def func(arg=1):
self.assertEqual(func(2), True)

def test_not_first_argument(self):
@self.breaker(break_on='other_arg')
@self.breaker(break_on="other_arg")
def func(arg, other_arg=1):
if other_arg == 1:
raise Exception
Expand All @@ -836,7 +848,7 @@ def func(arg, other_arg=1):
self.assertEqual(func(1, 2), True)

def test_keyword_only(self):
@self.breaker(break_on='other_arg')
@self.breaker(break_on="other_arg")
def func(arg, *positional, other_arg=1):
if other_arg == 1:
raise Exception
Expand Down Expand Up @@ -993,12 +1005,6 @@ def trigger_error():
except SpecificException:
pass

def _inc_counter(self):
c = self._state_storage._fail_counter
sleep(0.00005)
self._state_storage._fail_counter = c + 1

self._mock_function(self.breaker, _inc_counter)
self._start_threads(trigger_error, 3)
self.assertEqual(1500, self.breaker.fail_counter)

Expand All @@ -1015,17 +1021,10 @@ def trigger_success():
for n in range(500):
suc()

class SuccessListener(CircuitBreakerListener):
def success(self, cb):
c = 0
if hasattr(cb, "_success_counter"):
c = cb._success_counter
sleep(0.00005)
cb._success_counter = c + 1

self.breaker.add_listener(SuccessListener())
success_listener = SuccessListener()
self.breaker.add_listener(success_listener)
self._start_threads(trigger_success, 3)
self.assertEqual(1500, self.breaker._success_counter)
self.assertEqual(1500, success_listener.success_count)

def test_half_open_thread_safety(self):
"""CircuitBreaker: it should allow only one trial call when the
Expand Down Expand Up @@ -1085,7 +1084,7 @@ def before_call(self, cb, func, *args, **kwargs):

self.breaker.add_listener(SleepListener())
self._start_threads(trigger_error, 3)
self.assertEqual(self.breaker.fail_max, self.breaker.fail_counter)
self.assertGreater(self.breaker.fail_counter, self.breaker.fail_max)


class CircuitBreakerRedisConcurrencyTestCase(unittest.TestCase):
Expand Down Expand Up @@ -1160,17 +1159,10 @@ def trigger_success():
for n in range(500):
suc()

class SuccessListener(CircuitBreakerListener):
def success(self, cb):
c = 0
if hasattr(cb, "_success_counter"):
c = cb._success_counter
sleep(0.00005)
cb._success_counter = c + 1

self.breaker.add_listener(SuccessListener())
success_listener = SuccessListener()
self.breaker.add_listener(success_listener)
self._start_threads(trigger_success, 3)
self.assertEqual(1500, self.breaker._success_counter)
self.assertEqual(1500, success_listener.success_count)

def test_half_open_thread_safety(self):
"""CircuitBreaker: it should allow only one trial call when the
Expand Down

0 comments on commit c4db966

Please sign in to comment.