Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit connection configuration to queue_service #1882

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@
# ------------------------------------------------------------------------------
EVENTS_QUEUE_URL = env("EVENTS_QUEUE_URL", default=None)
EVENTS_QUEUE_EXCHANGE_NAME = env("EVENTS_QUEUE_EXCHANGE_NAME", default="amq.fanout")
EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT = env.int(
"EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT", default=0
)

# Cache
CACHE_ALL_TXS_VIEW = env.int(
Expand Down
40 changes: 32 additions & 8 deletions safe_transaction_service/events/services/queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,56 @@ def get_queue_service():
class QueueService:
def __init__(self):
self._connection_pool: List[BrokerConnection] = []
self._total_connections: int = 0
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
self.unsent_events: List = []

def get_connection(self) -> BrokerConnection:
def get_connection(self) -> Optional[BrokerConnection]:
"""
:return: A `BrokerConnection` from the connection pool if there is one available, othwerwise
returns a new BrokerConnection
"""
if (
settings.EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT
and self._total_connections >= settings.EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT
):
logger.warning(
"Number of active connections reached the pool limit: %d",
self._total_connections,
)
return None

if self._connection_pool:
return self._connection_pool.pop()
broker_connection = self._connection_pool.pop()
else:
return BrokerConnection()
broker_connection = BrokerConnection()

self._total_connections += 1
return broker_connection

def release_connection(self, broker_connection: BrokerConnection) -> None:
def release_connection(self, broker_connection: Optional[BrokerConnection]):
"""
Return the `BrokerConnection` to the pool

:param broker_connection:
:return:
"""
return self._connection_pool.insert(0, broker_connection)
self._total_connections -= 1
# Don't add broken connections to the pool
if broker_connection:
self._connection_pool.insert(0, broker_connection)

def send_event(self, payload: Dict[str, Any]) -> int:
"""
Publish event using the `BrokerConnection`

:param payload: Number of events published
"""
broker_connection = self.get_connection()

event = json.dumps(payload)
if not (broker_connection := self.get_connection()):
# No available connections in the pool, store event to send it later
self.unsent_events.append(event)
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
return 0

if broker_connection.publish(event):
logger.debug("Event correctly sent: %s", event)
self.release_connection(broker_connection)
Expand All @@ -114,6 +134,8 @@ def send_event(self, payload: Dict[str, Any]) -> int:
logger.warning("Unable to send the event due to a connection error")
logger.debug("Adding %s to unsent messages", payload)
self.unsent_events.append(event)
# As the message cannot be sent, we don't want to send the problematic connection back to the pool, only reduce the number of total connections
self.release_connection(None)
moisses89 marked this conversation as resolved.
Show resolved Hide resolved
return 0

def send_unsent_events(self) -> int:
Expand All @@ -125,7 +147,9 @@ def send_unsent_events(self) -> int:
if not self.unsent_events:
return 0

broker_connection = self.get_connection()
if not (broker_connection := self.get_connection()):
# Connection not available in the pool
return 0

# Avoid race conditions
unsent_events = self.unsent_events
Expand Down
41 changes: 36 additions & 5 deletions safe_transaction_service/events/tests/test_queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pika.channel import Channel
from pika.exceptions import ConnectionClosedByBroker

from ..services.queue_service import BrokerConnection, get_queue_service
from ..services.queue_service import BrokerConnection, QueueService, get_queue_service


class TestQueueService(TestCase):
Expand Down Expand Up @@ -58,10 +58,27 @@ def test_send_unsent_messages(self):
_, _, body = broker_connection.channel.basic_get(self.queue, auto_ack=True)
self.assertEqual(json.loads(body), payload)

def test_send_with_pool_limit(self):
queue_service = QueueService()
payload = "Pool limit test"
# Unused connection, just to reach the limit
connection_1 = queue_service.get_connection()
self.assertEqual(len(queue_service.unsent_events), 0)
self.assertEqual(queue_service.send_event(payload), 1)
with self.settings(EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT=1):
self.assertEqual(queue_service._total_connections, 1)
self.assertEqual(len(queue_service.unsent_events), 0)
self.assertEqual(queue_service.send_event(payload), 0)
self.assertEqual(len(queue_service.unsent_events), 1)
queue_service.release_connection(connection_1)
self.assertEqual(len(queue_service.unsent_events), 1)
self.assertEqual(queue_service.send_event(payload), 2)
self.assertEqual(len(queue_service.unsent_events), 0)

def test_send_event_to_queue(self):
payload = {"event": "test_event", "type": "event type"}
queue_service = get_queue_service()
# Clean previous pool connections
queue_service = QueueService()
# Clean previous connection pool
queue_service._connection_pool = []
self.assertEqual(len(queue_service._connection_pool), 0)
queue_service.send_event(payload)
Expand All @@ -72,15 +89,29 @@ def test_send_event_to_queue(self):
self.assertEqual(json.loads(body), payload)

def test_get_connection(self):
queue_service = get_queue_service()
# Clean previous pool connections
queue_service = QueueService()
# Clean previous connection pool
queue_service._connection_pool = []
self.assertEqual(len(queue_service._connection_pool), 0)
self.assertEqual(queue_service._total_connections, 0)
connection_1 = queue_service.get_connection()
self.assertEqual(len(queue_service._connection_pool), 0)
self.assertEqual(queue_service._total_connections, 1)
connection_2 = queue_service.get_connection()
self.assertEqual(len(queue_service._connection_pool), 0)
self.assertEqual(queue_service._total_connections, 2)
queue_service.release_connection(connection_1)
self.assertEqual(len(queue_service._connection_pool), 1)
self.assertEqual(queue_service._total_connections, 1)
queue_service.release_connection(connection_2)
self.assertEqual(len(queue_service._connection_pool), 2)
self.assertEqual(queue_service._total_connections, 0)
with self.settings(EVENTS_QUEUE_POOL_CONNECTIONS_LIMIT=1):
connection_1 = queue_service.get_connection()
self.assertEqual(len(queue_service._connection_pool), 1)
self.assertEqual(queue_service._total_connections, 1)
# We should reach the connection limit of the pool
connection_1 = queue_service.get_connection()
self.assertEqual(len(queue_service._connection_pool), 1)
self.assertEqual(queue_service._total_connections, 1)
self.assertIsNone(connection_1)
Loading