Skip to content

Commit

Permalink
Fix throtteling queue increases progressively (#658)
Browse files Browse the repository at this point in the history
* fix increase of wait time
* update changelog
* separate decode error handling from unexpected error handling
* use libc.usleep for better precision
* add tests for throttleling queue
  • Loading branch information
ekneg54 authored Aug 29, 2024
1 parent 16257ca commit 086d8e9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
### Improvements
### Bugfix

* fixes a bug not increasing but decreasing timeout throttle factor of ThrottlingQueue
* handle DecodeError and unexpected Exceptions on requests in `http_input` separately

## 13.1.1
### Improvements

Expand Down
9 changes: 8 additions & 1 deletion logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from falcon import ( # pylint: disable=no-name-in-module
HTTP_200,
HTTPBadRequest,
HTTPInternalServerError,
HTTPMethodNotAllowed,
HTTPTooManyRequests,
HTTPUnauthorized,
Expand Down Expand Up @@ -145,8 +146,14 @@ async def func_wrapper(*args, **kwargs):
raise error from error
except queue.Full as error:
raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from error
except msgspec.DecodeError as error:
raise HTTPBadRequest(
description=f"Can't decode message due to: {str(error)}"
) from error
except Exception as error: # pylint: disable=broad-except
raise HTTPBadRequest(str(error)) from error
raise HTTPInternalServerError(
description=f"Unexpected Exception: {str(error)}"
) from error
return func_wrapper

return func_wrapper
Expand Down
19 changes: 13 additions & 6 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pylint: disable=logging-fstring-interpolation

import ctypes
import logging
import logging.handlers
import multiprocessing
Expand All @@ -20,18 +21,20 @@
from logprep.util.configuration import Configuration
from logprep.util.logging import LogprepMPQueueListener, logqueue

libc = ctypes.CDLL("libc.so.6")

logger = logging.getLogger("Manager")


class ThrottlingQueue(multiprocessing.queues.Queue):
"""A queue that throttles the number of items that can be put into it."""

wait_time = 0.0000000000000001
wait_time = 5

@property
def consumed_percent(self):
def consumed_percent(self) -> int:
"""Return the percentage of items consumed."""
return self.qsize() / self.capacity
return int((self.qsize() / self.capacity) * 100)

def __init__(self, ctx, maxsize):
super().__init__(ctx=ctx, maxsize=maxsize)
Expand All @@ -40,12 +43,16 @@ def __init__(self, ctx, maxsize):

def throttle(self, batch_size=1):
"""Throttle put by sleeping."""
time.sleep((self.wait_time**self.consumed_percent) / batch_size)
if self.consumed_percent > 90:
sleep_time = max(
self.wait_time, int(self.wait_time * self.consumed_percent / batch_size)
)
# sleep times in microseconds
libc.usleep(sleep_time)

def put(self, obj, block=True, timeout=None, batch_size=1):
"""Put an obj into the queue."""
if self.consumed_percent >= 0.9:
self.throttle(batch_size)
self.throttle(batch_size)
super().put(obj, block=block, timeout=timeout)


Expand Down
41 changes: 40 additions & 1 deletion tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# pylint: disable=missing-docstring
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
import multiprocessing
from copy import deepcopy
from logging import Logger
from logging.config import dictConfig
from unittest import mock

from logprep.connector.http.input import HttpInput
from logprep.factory import Factory
from logprep.framework.pipeline_manager import PipelineManager
from logprep.framework.pipeline_manager import PipelineManager, ThrottlingQueue
from logprep.metrics.exporter import PrometheusExporter
from logprep.util.configuration import Configuration, MetricsConfig
from logprep.util.defaults import DEFAULT_LOG_CONFIG
Expand Down Expand Up @@ -254,3 +255,41 @@ def test_restart_failed_pipeline_restarts_immediately_on_negative_restart_count_
pipeline_manager._pipelines[0].is_alive.return_value = False
pipeline_manager.restart_failed_pipeline()
mock_time_sleep.assert_not_called()


class TestThrottlingQueue:

def test_throttling_queue_is_multiprocessing_queue(self):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
assert isinstance(queue, ThrottlingQueue)
assert isinstance(queue, multiprocessing.queues.Queue)

def test_throttling_put_calls_parent(self):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch.object(multiprocessing.queues.Queue, "put") as mock_put:
queue.put("test")
mock_put.assert_called_with("test", block=True, timeout=None)

def test_throttling_put_throttles(self):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch.object(queue, "throttle") as mock_throttle:
queue.put("test")
mock_throttle.assert_called()

def test_throttle_sleeps(self):
with mock.patch("logprep.framework.pipeline_manager.libc.usleep") as mock_sleep:
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch.object(queue, "qsize", return_value=95):
queue.throttle()
mock_sleep.assert_called()

def test_throttle_sleep_time_increases_with_qsize(self):
with mock.patch("logprep.framework.pipeline_manager.libc.usleep") as mock_sleep:
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch.object(queue, "qsize", return_value=91):
queue.throttle()
first_sleep_time = mock_sleep.call_args[0][0]
mock_sleep.reset_mock()
with mock.patch.object(queue, "qsize", return_value=95):
queue.throttle()
assert mock_sleep.call_args[0][0] > first_sleep_time

0 comments on commit 086d8e9

Please sign in to comment.