Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dj_cqrs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ class SignalType:
DEFAULT_REPLICA_MAX_RETRIES = 30
DEFAULT_REPLICA_RETRY_DELAY = 2 # seconds
DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE = 1000

DB_VENDOR_PG = 'postgresql'
DB_VENDOR_MYSQL = 'mysql'
SUPPORTED_TIMEOUT_DB_VENDORS = {DB_VENDOR_MYSQL, DB_VENDOR_PG}

PG_TIMEOUT_FLAG = 'statement timeout'
MYSQL_TIMEOUT_ERROR_CODE = 3024
25 changes: 4 additions & 21 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from django.db import Error, close_old_connections, transaction

from dj_cqrs.constants import SignalType
from dj_cqrs.logger import log_timed_out_queries
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.utils import apply_query_timeouts


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -66,7 +68,7 @@ def route_signal_to_replica_model(
is_meta_supported = model_cls.CQRS_META
try:
if db_is_needed:
_apply_query_timeouts(model_cls)
apply_query_timeouts(model_cls)

with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
Expand Down Expand Up @@ -101,23 +103,4 @@ def route_signal_to_replica_model(
),
)


def _apply_query_timeouts(model_cls): # pragma: no cover
query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0))
if query_timeout <= 0:
return

model_db = model_cls._default_manager.db
conn = transaction.get_connection(using=model_db)
conn_vendor = getattr(conn, 'vendor', '')

if conn_vendor not in {'postgresql', 'mysql'}:
return

if conn_vendor == 'postgresql':
statement = 'SET statement_timeout TO %s'
else:
statement = 'SET SESSION MAX_EXECUTION_TIME=%s'

with conn.cursor() as cursor:
cursor.execute(statement, params=(query_timeout,))
log_timed_out_queries(e, model_cls)
60 changes: 60 additions & 0 deletions dj_cqrs/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging

from django.conf import settings
from django.db import OperationalError, transaction

from dj_cqrs.constants import (
DB_VENDOR_MYSQL,
DB_VENDOR_PG,
MYSQL_TIMEOUT_ERROR_CODE,
PG_TIMEOUT_FLAG,
SUPPORTED_TIMEOUT_DB_VENDORS,
)


def install_last_query_capturer(model_cls):
conn = _connection(model_cls)
if not _get_last_query_capturer(conn):
conn.execute_wrappers.append(_LastQueryCaptureWrapper())


def log_timed_out_queries(error, model_cls): # pragma: no cover
log_q = bool(settings.CQRS['replica'].get('CQRS_LOG_TIMED_OUT_QUERIES', False))
if not (log_q and isinstance(error, OperationalError) and error.args):
return

conn = _connection(model_cls)
conn_vendor = getattr(conn, 'vendor', '')
if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
return

e_arg = error.args[0]
is_timeout_error = bool(
(conn_vendor == DB_VENDOR_MYSQL and e_arg == MYSQL_TIMEOUT_ERROR_CODE)
or (conn_vendor == DB_VENDOR_PG and isinstance(e_arg, str) and PG_TIMEOUT_FLAG in e_arg)
)
if is_timeout_error:
query = getattr(_get_last_query_capturer(conn), 'query', None)
if query:
logger_name = settings.CQRS['replica'].get('CQRS_QUERY_LOGGER', '') or 'django-cqrs'
logger = logging.getLogger(logger_name)
logger.error('Timed out query:\n%s', query)


class _LastQueryCaptureWrapper:
def __init__(self):
self.query = None

def __call__(self, execute, sql, params, many, context):
try:
execute(sql, params, many, context)
finally:
self.query = sql


def _get_last_query_capturer(conn):
return next((w for w in conn.execute_wrappers if isinstance(w, _LastQueryCaptureWrapper)), None)


def _connection(model_cls):
return transaction.get_connection(using=model_cls._default_manager.db)
26 changes: 26 additions & 0 deletions dj_cqrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from uuid import UUID

from django.conf import settings
from django.db import transaction
from django.utils import timezone

from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
from dj_cqrs.logger import install_last_query_capturer


logger = logging.getLogger('django-cqrs')

Expand Down Expand Up @@ -54,3 +58,25 @@ def get_messages_prefetch_count_per_worker():

def get_json_valid_value(value):
return str(value) if isinstance(value, (date, datetime, UUID)) else value


def apply_query_timeouts(model_cls): # pragma: no cover
query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0))
if query_timeout <= 0:
return

model_db = model_cls._default_manager.db
conn = transaction.get_connection(using=model_db)
conn_vendor = getattr(conn, 'vendor', '')
if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
return

if conn_vendor == DB_VENDOR_PG:
statement = 'SET statement_timeout TO %s'
else:
statement = 'SET SESSION MAX_EXECUTION_TIME=%s'

with conn.cursor() as cursor:
cursor.execute(statement, params=(query_timeout,))

install_last_query_capturer(model_cls)
2 changes: 2 additions & 0 deletions integration_tests/replica_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
'delay_queue_max_size': 50,
'dead_letter_queue': 'dead_letter_replica',
'dead_message_ttl': 5,
'CQRS_QUERY_TIMEOUT': 10000,
'CQRS_LOG_TIMED_OUT_QUERIES': 1,
},
}

Expand Down
146 changes: 146 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging

import pytest
from django.db import (
DatabaseError,
IntegrityError,
OperationalError,
connection,
)

from dj_cqrs.logger import (
_LastQueryCaptureWrapper,
install_last_query_capturer,
log_timed_out_queries,
)
from tests.dj_replica import models


@pytest.mark.django_db(transaction=True)
def test_install_last_query_capturer():
for _ in range(2):
install_last_query_capturer(models.AuthorRef)

assert len(connection.execute_wrappers) == 1
assert isinstance(connection.execute_wrappers[0], _LastQueryCaptureWrapper)

with connection.cursor() as c:
c.execute('SELECT 1')

assert connection.execute_wrappers[0].query == 'SELECT 1'

connection.execute_wrappers.pop()


def test_log_timed_out_queries_not_supported(caplog):
assert log_timed_out_queries(None, None) is None
assert not caplog.record_tuples


@pytest.mark.parametrize(
'error',
[
IntegrityError('some error'),
DatabaseError(),
OperationalError(),
],
)
def test_log_timed_out_queries_other_error(error, settings, caplog):
settings.CQRS_LOG_TIMED_OUT_QUERIES = 1

assert log_timed_out_queries(error, None) is None
assert not caplog.record_tuples


@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize(
'engine, error, l_name, records',
[
('sqlite', None, None, []),
(
'postgres',
OperationalError('canceling statement due to statement timeout'),
None,
[
(
'django-cqrs',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'postgres',
OperationalError('canceling statement due to statement timeout'),
'long-query',
[
(
'long-query',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'postgres',
OperationalError('could not connect to server'),
None,
[],
),
(
'postgres',
OperationalError(125, 'Some error'),
None,
[],
),
(
'mysql',
OperationalError(3024),
None,
[
(
'django-cqrs',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'mysql',
OperationalError(
3024, 'Query exec was interrupted, max statement execution time exceeded'
),
'long-query-1',
[
(
'long-query-1',
logging.ERROR,
'Timed out query:\nSELECT 1',
)
],
),
(
'mysql',
OperationalError(1040, 'Too many connections'),
None,
[],
),
],
)
def test_apply_query_timeouts(settings, engine, l_name, error, records, caplog):
if settings.DB_ENGINE != engine:
return

settings.CQRS['replica']['CQRS_LOG_TIMED_OUT_QUERIES'] = True
settings.CQRS['replica']['CQRS_QUERY_LOGGER'] = l_name

model_cls = models.BasicFieldsModelRef
install_last_query_capturer(model_cls)

with connection.cursor() as c:
c.execute('SELECT 1')

assert log_timed_out_queries(error, model_cls) is None
assert caplog.record_tuples == records

connection.execute_wrappers.pop()
23 changes: 23 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
timedelta,
timezone,
)
from unittest.mock import patch
from uuid import UUID

import pytest

from dj_cqrs.utils import (
apply_query_timeouts,
get_delay_queue_max_size,
get_json_valid_value,
get_message_expiration_dt,
get_messages_prefetch_count_per_worker,
)
from tests.dj_replica import models


def test_get_message_expiration_dt_fixed(mocker, settings):
Expand Down Expand Up @@ -86,3 +89,23 @@ def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings):
)
def test_get_json_valid_value(value, result):
assert get_json_valid_value(value) == result


@pytest.mark.django_db
@pytest.mark.parametrize(
'engine, p_count',
[
('sqlite', 0),
('postgres', 1),
('mysql', 1),
],
)
def test_apply_query_timeouts(settings, engine, p_count):
if settings.DB_ENGINE != engine:
return

settings.CQRS['replica']['CQRS_QUERY_TIMEOUT'] = 1
with patch('dj_cqrs.utils.install_last_query_capturer') as p:
assert apply_query_timeouts(models.BasicFieldsModelRef) is None

assert p.call_count == p_count