From fd085311154ac64dd0a63c9ab111b86ee4d4e8e1 Mon Sep 17 00:00:00 2001 From: Anentropic Date: Wed, 17 Oct 2018 11:44:45 +0100 Subject: [PATCH 1/3] refactor so that StorageBase contains the actual token algorithm --- token_bucket/storage.py | 152 +-------------------------------- token_bucket/storage_base.py | 160 ++++++++++++++++++++++++++++++++++- 2 files changed, 160 insertions(+), 152 deletions(-) diff --git a/token_bucket/storage.py b/token_bucket/storage.py index 07cf7fa..1cf4bd6 100644 --- a/token_bucket/storage.py +++ b/token_bucket/storage.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time - from .storage_base import StorageBase @@ -28,152 +26,4 @@ class MemoryStorage(StorageBase): the case that it is, the situation can be remedied by simply increasing the bucket capacity by a few tokens. """ - - def __init__(self): - self._buckets = {} - - def get_token_count(self, key): - """Query the current token count for the given bucket. - - Note that the bucket is not replenished first, so the count - will be what it was the last time replenish() was called. - - Args: - key (str): Name of the bucket to query. - - Returns: - float: Number of tokens currently in the bucket (may be - fractional). - """ - try: - return self._buckets[key][0] - except KeyError: - pass - - return 0 - - def replenish(self, key, rate, capacity): - """Add tokens to a bucket per the given rate. - - This method is exposed for use by the token_bucket.Limiter - class. - """ - - try: - # NOTE(kgriffs): Correctness of this algorithm assumes - # that the calculation of the current time is performed - # in the same order as the updates based on that - # timestamp, across all threads. If an older "now" - # completes before a newer "now", the lower token - # count will overwrite the newer, effectively reducing - # the bucket's capacity temporarily, by a minor amount. - # - # While a lock could be used to fix this race condition, - # one isn't used here for the following reasons: - # - # 1. The condition above will rarely occur, since - # the window of opportunity is quite small and - # even so requires many threads contending for a - # relatively small number of bucket keys. - # 2. When the condition does occur, the difference - # in timestamps will be quite small, resulting in - # a negligible loss in tokens. - # 3. Depending on the order in which instructions - # are interleaved between threads, the condition - # can be detected and mitigated by comparing - # timestamps. This mitigation is implemented below, - # and serves to further minimize the effect of this - # race condition to negligible levels. - # 4. While locking introduces only a small amount of - # overhead (less than a microsecond), there's no - # reason to waste those CPU cycles in light of the - # points above. - # 5. If a lock were used, it would only be held for - # a microsecond or less. We are unlikely to see - # much contention for the lock during such a short - # time window, but we might as well remove the - # possibility in light of the points above. - - tokens_in_bucket, last_replenished_at = self._buckets[key] - - now = time.time() - - # NOTE(kgriffs): This will detect many, but not all, - # manifestations of the race condition. If a later - # timestamp was already used to update the bucket, don't - # regress by setting the token count to a smaller number. - if now < last_replenished_at: # pragma: no cover - return - - self._buckets[key] = [ - # Limit to capacity - min( - capacity, - - # NOTE(kgriffs): The new value is the current number - # of tokens in the bucket plus the number of - # tokens generated since last time. Fractional - # tokens are permitted in order to improve - # accuracy (now is a float, and rate may be also). - tokens_in_bucket + (rate * (now - last_replenished_at)) - ), - - # Update the timestamp for use next time - now - ] - - except KeyError: - self._buckets[key] = [capacity, time.time()] - - def consume(self, key, num_tokens): - """Attempt to take one or more tokens from a bucket. - - This method is exposed for use by the token_bucket.Limiter - class. - """ - - # NOTE(kgriffs): Assume that the key will be present, since - # replenish() will always be called before consume(). - tokens_in_bucket = self._buckets[key][0] - if tokens_in_bucket < num_tokens: - return False - - # NOTE(kgriffs): In a multi-threaded application, it is - # possible for two threads to interleave such that they - # both pass the check above, while in reality if executed - # linearly, the second thread would not pass the check - # since the first thread was able to consume the remaining - # tokens in the bucket. - # - # When this race condition occurs, the count in the bucket - # will go negative, effectively resulting in a slight - # reduction in capacity. - # - # While a lock could be used to fix this race condition, - # one isn't used here for the following reasons: - # - # 1. The condition above will rarely occur, since - # the window of opportunity is quite small. - # 2. When the condition does occur, the tokens will - # usually be quickly replenished since the rate tends - # to be much larger relative to the number of tokens - # that are consumed by any one request, and due to (1) - # the condition is very rarely likely to happen - # multiple times in a row. - # 3. In the case of bursting across a large number of - # threads, the likelihood for this race condition - # will increase. Even so, the burst will be quickly - # negated as requests become non-conforming, allowing - # the bucket to be replenished. - # 4. While locking introduces only a small amount of - # overhead (less than a microsecond), there's no - # reason to waste those CPU cycles in light of the - # points above. - # 5. If a lock were used, it would only be held for - # less than a microsecond. We are unlikely to see - # much contention for the lock during such a short - # time window, but we might as well remove the - # possibility given the points above. - - self._buckets[key][0] -= num_tokens - return True + bucket_provider_cls = dict diff --git a/token_bucket/storage_base.py b/token_bucket/storage_base.py index e64abdf..a23ce30 100644 --- a/token_bucket/storage_base.py +++ b/token_bucket/storage_base.py @@ -13,9 +13,10 @@ # limitations under the License. import abc +import time -class StorageBase(object): +class AbstractStorage(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod @@ -69,3 +70,160 @@ def consume(self, key, num_tokens): from the bucket (conforming), otherwise False (non- conforming). """ + + +class StorageBase(AbstractStorage): + __metaclass__ = abc.ABCMeta + + # any object that implements __getitem__, __setitem__ + # (and should raise KeyError from __getitem__ if key not found) + bucket_provider_cls = dict + + def __init__(self): + self._buckets = self.bucket_provider_cls() + + def get_token_count(self, key): + """Query the current token count for the given bucket. + + Note that the bucket is not replenished first, so the count + will be what it was the last time replenish() was called. + + Args: + key (str): Name of the bucket to query. + + Returns: + float: Number of tokens currently in the bucket (may be + fractional). + """ + try: + return self._buckets[key][0] + except KeyError: + pass + + return 0 + + def replenish(self, key, rate, capacity): + """Add tokens to a bucket per the given rate. + + This method is exposed for use by the token_bucket.Limiter + class. + """ + + try: + # NOTE(kgriffs): Correctness of this algorithm assumes + # that the calculation of the current time is performed + # in the same order as the updates based on that + # timestamp, across all threads. If an older "now" + # completes before a newer "now", the lower token + # count will overwrite the newer, effectively reducing + # the bucket's capacity temporarily, by a minor amount. + # + # While a lock could be used to fix this race condition, + # one isn't used here for the following reasons: + # + # 1. The condition above will rarely occur, since + # the window of opportunity is quite small and + # even so requires many threads contending for a + # relatively small number of bucket keys. + # 2. When the condition does occur, the difference + # in timestamps will be quite small, resulting in + # a negligible loss in tokens. + # 3. Depending on the order in which instructions + # are interleaved between threads, the condition + # can be detected and mitigated by comparing + # timestamps. This mitigation is implemented below, + # and serves to further minimize the effect of this + # race condition to negligible levels. + # 4. While locking introduces only a small amount of + # overhead (less than a microsecond), there's no + # reason to waste those CPU cycles in light of the + # points above. + # 5. If a lock were used, it would only be held for + # a microsecond or less. We are unlikely to see + # much contention for the lock during such a short + # time window, but we might as well remove the + # possibility in light of the points above. + + tokens_in_bucket, last_replenished_at = self._buckets[key] + + now = time.time() + + # NOTE(kgriffs): This will detect many, but not all, + # manifestations of the race condition. If a later + # timestamp was already used to update the bucket, don't + # regress by setting the token count to a smaller number. + if now < last_replenished_at: # pragma: no cover + return + + self._buckets[key] = [ + # Limit to capacity + min( + capacity, + + # NOTE(kgriffs): The new value is the current number + # of tokens in the bucket plus the number of + # tokens generated since last time. Fractional + # tokens are permitted in order to improve + # accuracy (now is a float, and rate may be also). + tokens_in_bucket + (rate * (now - last_replenished_at)) + ), + + # Update the timestamp for use next time + now + ] + + except KeyError: + self._buckets[key] = [capacity, time.time()] + + def consume(self, key, num_tokens): + """Attempt to take one or more tokens from a bucket. + + This method is exposed for use by the token_bucket.Limiter + class. + """ + + # NOTE(kgriffs): Assume that the key will be present, since + # replenish() will always be called before consume(). + tokens_in_bucket = self._buckets[key][0] + if tokens_in_bucket < num_tokens: + return False + + # NOTE(kgriffs): In a multi-threaded application, it is + # possible for two threads to interleave such that they + # both pass the check above, while in reality if executed + # linearly, the second thread would not pass the check + # since the first thread was able to consume the remaining + # tokens in the bucket. + # + # When this race condition occurs, the count in the bucket + # will go negative, effectively resulting in a slight + # reduction in capacity. + # + # While a lock could be used to fix this race condition, + # one isn't used here for the following reasons: + # + # 1. The condition above will rarely occur, since + # the window of opportunity is quite small. + # 2. When the condition does occur, the tokens will + # usually be quickly replenished since the rate tends + # to be much larger relative to the number of tokens + # that are consumed by any one request, and due to (1) + # the condition is very rarely likely to happen + # multiple times in a row. + # 3. In the case of bursting across a large number of + # threads, the likelihood for this race condition + # will increase. Even so, the burst will be quickly + # negated as requests become non-conforming, allowing + # the bucket to be replenished. + # 4. While locking introduces only a small amount of + # overhead (less than a microsecond), there's no + # reason to waste those CPU cycles in light of the + # points above. + # 5. If a lock were used, it would only be held for + # less than a microsecond. We are unlikely to see + # much contention for the lock during such a short + # time window, but we might as well remove the + # possibility given the points above. + + self._buckets[key][0] -= num_tokens + return True From 049c0eed64cc9855bee77360ff23265d032c68dd Mon Sep 17 00:00:00 2001 From: Anentropic Date: Fri, 19 Oct 2018 11:39:32 +0100 Subject: [PATCH 2/3] make bucket objects immutable (helps implementing non-dict bucket providers) --- token_bucket/storage_base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/token_bucket/storage_base.py b/token_bucket/storage_base.py index a23ce30..d995ec1 100644 --- a/token_bucket/storage_base.py +++ b/token_bucket/storage_base.py @@ -155,7 +155,7 @@ def replenish(self, key, rate, capacity): if now < last_replenished_at: # pragma: no cover return - self._buckets[key] = [ + self._buckets[key] = ( # Limit to capacity min( capacity, @@ -170,10 +170,10 @@ def replenish(self, key, rate, capacity): # Update the timestamp for use next time now - ] + ) except KeyError: - self._buckets[key] = [capacity, time.time()] + self._buckets[key] = (capacity, time.time()) def consume(self, key, num_tokens): """Attempt to take one or more tokens from a bucket. @@ -184,7 +184,7 @@ def consume(self, key, num_tokens): # NOTE(kgriffs): Assume that the key will be present, since # replenish() will always be called before consume(). - tokens_in_bucket = self._buckets[key][0] + tokens_in_bucket, timestamp = self._buckets[key] if tokens_in_bucket < num_tokens: return False @@ -225,5 +225,5 @@ def consume(self, key, num_tokens): # time window, but we might as well remove the # possibility given the points above. - self._buckets[key][0] -= num_tokens + self._buckets[key] = (tokens_in_bucket - num_tokens, timestamp) return True From 158324f12867f2109ca97e0563147a7d1052cd56 Mon Sep 17 00:00:00 2001 From: Anentropic Date: Fri, 19 Oct 2018 12:30:04 +0100 Subject: [PATCH 3/3] prefer tight try/except clause --- token_bucket/storage_base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/token_bucket/storage_base.py b/token_bucket/storage_base.py index d995ec1..8ed5f7c 100644 --- a/token_bucket/storage_base.py +++ b/token_bucket/storage_base.py @@ -145,7 +145,9 @@ def replenish(self, key, rate, capacity): # possibility in light of the points above. tokens_in_bucket, last_replenished_at = self._buckets[key] - + except KeyError: + self._buckets[key] = (capacity, time.time()) + else: now = time.time() # NOTE(kgriffs): This will detect many, but not all, @@ -172,9 +174,6 @@ def replenish(self, key, rate, capacity): now ) - except KeyError: - self._buckets[key] = (capacity, time.time()) - def consume(self, key, num_tokens): """Attempt to take one or more tokens from a bucket.