From 7fffd70bb8b64b46e691a0f76ceae0c6929fc86c Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Thu, 29 Jun 2023 08:35:27 +0200 Subject: [PATCH] LITE-27792 Support for logging of timed out PG and MySQL queries --- dj_cqrs/constants.py | 7 ++ dj_cqrs/controller/consumer.py | 25 +---- dj_cqrs/logger.py | 60 +++++++++++ dj_cqrs/utils.py | 26 +++++ integration_tests/replica_settings.py | 2 + tests/test_logger.py | 146 ++++++++++++++++++++++++++ tests/test_utils.py | 23 ++++ 7 files changed, 268 insertions(+), 21 deletions(-) create mode 100644 dj_cqrs/logger.py create mode 100644 tests/test_logger.py diff --git a/dj_cqrs/constants.py b/dj_cqrs/constants.py index 3069da3..ed05863 100644 --- a/dj_cqrs/constants.py +++ b/dj_cqrs/constants.py @@ -29,3 +29,10 @@ class SignalType: DEFAULT_REPLICA_MAX_RETRIES = 30 DEFAULT_REPLICA_RETRY_DELAY = 2 # seconds DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE = 1000 + +DB_VENDOR_PG = 'postgresql' +DB_VENDOR_MYSQL = 'mysql' +SUPPORTED_TIMEOUT_DB_VENDORS = {DB_VENDOR_MYSQL, DB_VENDOR_PG} + +PG_TIMEOUT_FLAG = 'statement timeout' +MYSQL_TIMEOUT_ERROR_CODE = 3024 diff --git a/dj_cqrs/controller/consumer.py b/dj_cqrs/controller/consumer.py index e5a7536..e2bce8c 100644 --- a/dj_cqrs/controller/consumer.py +++ b/dj_cqrs/controller/consumer.py @@ -8,7 +8,9 @@ from django.db import Error, close_old_connections, transaction from dj_cqrs.constants import SignalType +from dj_cqrs.logger import log_timed_out_queries from dj_cqrs.registries import ReplicaRegistry +from dj_cqrs.utils import apply_query_timeouts logger = logging.getLogger('django-cqrs') @@ -66,7 +68,7 @@ def route_signal_to_replica_model( is_meta_supported = model_cls.CQRS_META try: if db_is_needed: - _apply_query_timeouts(model_cls) + apply_query_timeouts(model_cls) with transaction.atomic(savepoint=False) if db_is_needed else ExitStack(): if signal_type == SignalType.DELETE: @@ -101,23 +103,4 @@ def route_signal_to_replica_model( ), ) - -def _apply_query_timeouts(model_cls): # pragma: no cover - query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0)) - if query_timeout <= 0: - return - - model_db = model_cls._default_manager.db - conn = transaction.get_connection(using=model_db) - conn_vendor = getattr(conn, 'vendor', '') - - if conn_vendor not in {'postgresql', 'mysql'}: - return - - if conn_vendor == 'postgresql': - statement = 'SET statement_timeout TO %s' - else: - statement = 'SET SESSION MAX_EXECUTION_TIME=%s' - - with conn.cursor() as cursor: - cursor.execute(statement, params=(query_timeout,)) + log_timed_out_queries(e, model_cls) diff --git a/dj_cqrs/logger.py b/dj_cqrs/logger.py new file mode 100644 index 0000000..b5d8d58 --- /dev/null +++ b/dj_cqrs/logger.py @@ -0,0 +1,60 @@ +import logging + +from django.conf import settings +from django.db import OperationalError, transaction + +from dj_cqrs.constants import ( + DB_VENDOR_MYSQL, + DB_VENDOR_PG, + MYSQL_TIMEOUT_ERROR_CODE, + PG_TIMEOUT_FLAG, + SUPPORTED_TIMEOUT_DB_VENDORS, +) + + +def install_last_query_capturer(model_cls): + conn = _connection(model_cls) + if not _get_last_query_capturer(conn): + conn.execute_wrappers.append(_LastQueryCaptureWrapper()) + + +def log_timed_out_queries(error, model_cls): # pragma: no cover + log_q = bool(settings.CQRS['replica'].get('CQRS_LOG_TIMED_OUT_QUERIES', False)) + if not (log_q and isinstance(error, OperationalError) and error.args): + return + + conn = _connection(model_cls) + conn_vendor = getattr(conn, 'vendor', '') + if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS: + return + + e_arg = error.args[0] + is_timeout_error = bool( + (conn_vendor == DB_VENDOR_MYSQL and e_arg == MYSQL_TIMEOUT_ERROR_CODE) + or (conn_vendor == DB_VENDOR_PG and isinstance(e_arg, str) and PG_TIMEOUT_FLAG in e_arg) + ) + if is_timeout_error: + query = getattr(_get_last_query_capturer(conn), 'query', None) + if query: + logger_name = settings.CQRS['replica'].get('CQRS_QUERY_LOGGER', '') or 'django-cqrs' + logger = logging.getLogger(logger_name) + logger.error('Timed out query:\n%s', query) + + +class _LastQueryCaptureWrapper: + def __init__(self): + self.query = None + + def __call__(self, execute, sql, params, many, context): + try: + execute(sql, params, many, context) + finally: + self.query = sql + + +def _get_last_query_capturer(conn): + return next((w for w in conn.execute_wrappers if isinstance(w, _LastQueryCaptureWrapper)), None) + + +def _connection(model_cls): + return transaction.get_connection(using=model_cls._default_manager.db) diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index 259e2db..ae75690 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -5,8 +5,12 @@ from uuid import UUID from django.conf import settings +from django.db import transaction from django.utils import timezone +from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS +from dj_cqrs.logger import install_last_query_capturer + logger = logging.getLogger('django-cqrs') @@ -54,3 +58,25 @@ def get_messages_prefetch_count_per_worker(): def get_json_valid_value(value): return str(value) if isinstance(value, (date, datetime, UUID)) else value + + +def apply_query_timeouts(model_cls): # pragma: no cover + query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0)) + if query_timeout <= 0: + return + + model_db = model_cls._default_manager.db + conn = transaction.get_connection(using=model_db) + conn_vendor = getattr(conn, 'vendor', '') + if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS: + return + + if conn_vendor == DB_VENDOR_PG: + statement = 'SET statement_timeout TO %s' + else: + statement = 'SET SESSION MAX_EXECUTION_TIME=%s' + + with conn.cursor() as cursor: + cursor.execute(statement, params=(query_timeout,)) + + install_last_query_capturer(model_cls) diff --git a/integration_tests/replica_settings.py b/integration_tests/replica_settings.py index b4d6194..0649041 100644 --- a/integration_tests/replica_settings.py +++ b/integration_tests/replica_settings.py @@ -66,6 +66,8 @@ 'delay_queue_max_size': 50, 'dead_letter_queue': 'dead_letter_replica', 'dead_message_ttl': 5, + 'CQRS_QUERY_TIMEOUT': 10000, + 'CQRS_LOG_TIMED_OUT_QUERIES': 1, }, } diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000..f07558f --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,146 @@ +import logging + +import pytest +from django.db import ( + DatabaseError, + IntegrityError, + OperationalError, + connection, +) + +from dj_cqrs.logger import ( + _LastQueryCaptureWrapper, + install_last_query_capturer, + log_timed_out_queries, +) +from tests.dj_replica import models + + +@pytest.mark.django_db(transaction=True) +def test_install_last_query_capturer(): + for _ in range(2): + install_last_query_capturer(models.AuthorRef) + + assert len(connection.execute_wrappers) == 1 + assert isinstance(connection.execute_wrappers[0], _LastQueryCaptureWrapper) + + with connection.cursor() as c: + c.execute('SELECT 1') + + assert connection.execute_wrappers[0].query == 'SELECT 1' + + connection.execute_wrappers.pop() + + +def test_log_timed_out_queries_not_supported(caplog): + assert log_timed_out_queries(None, None) is None + assert not caplog.record_tuples + + +@pytest.mark.parametrize( + 'error', + [ + IntegrityError('some error'), + DatabaseError(), + OperationalError(), + ], +) +def test_log_timed_out_queries_other_error(error, settings, caplog): + settings.CQRS_LOG_TIMED_OUT_QUERIES = 1 + + assert log_timed_out_queries(error, None) is None + assert not caplog.record_tuples + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize( + 'engine, error, l_name, records', + [ + ('sqlite', None, None, []), + ( + 'postgres', + OperationalError('canceling statement due to statement timeout'), + None, + [ + ( + 'django-cqrs', + logging.ERROR, + 'Timed out query:\nSELECT 1', + ) + ], + ), + ( + 'postgres', + OperationalError('canceling statement due to statement timeout'), + 'long-query', + [ + ( + 'long-query', + logging.ERROR, + 'Timed out query:\nSELECT 1', + ) + ], + ), + ( + 'postgres', + OperationalError('could not connect to server'), + None, + [], + ), + ( + 'postgres', + OperationalError(125, 'Some error'), + None, + [], + ), + ( + 'mysql', + OperationalError(3024), + None, + [ + ( + 'django-cqrs', + logging.ERROR, + 'Timed out query:\nSELECT 1', + ) + ], + ), + ( + 'mysql', + OperationalError( + 3024, 'Query exec was interrupted, max statement execution time exceeded' + ), + 'long-query-1', + [ + ( + 'long-query-1', + logging.ERROR, + 'Timed out query:\nSELECT 1', + ) + ], + ), + ( + 'mysql', + OperationalError(1040, 'Too many connections'), + None, + [], + ), + ], +) +def test_apply_query_timeouts(settings, engine, l_name, error, records, caplog): + if settings.DB_ENGINE != engine: + return + + settings.CQRS['replica']['CQRS_LOG_TIMED_OUT_QUERIES'] = True + settings.CQRS['replica']['CQRS_QUERY_LOGGER'] = l_name + + model_cls = models.BasicFieldsModelRef + install_last_query_capturer(model_cls) + + with connection.cursor() as c: + c.execute('SELECT 1') + + assert log_timed_out_queries(error, model_cls) is None + assert caplog.record_tuples == records + + connection.execute_wrappers.pop() diff --git a/tests/test_utils.py b/tests/test_utils.py index 8025317..34920ab 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,16 +6,19 @@ timedelta, timezone, ) +from unittest.mock import patch from uuid import UUID import pytest from dj_cqrs.utils import ( + apply_query_timeouts, get_delay_queue_max_size, get_json_valid_value, get_message_expiration_dt, get_messages_prefetch_count_per_worker, ) +from tests.dj_replica import models def test_get_message_expiration_dt_fixed(mocker, settings): @@ -86,3 +89,23 @@ def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings): ) def test_get_json_valid_value(value, result): assert get_json_valid_value(value) == result + + +@pytest.mark.django_db +@pytest.mark.parametrize( + 'engine, p_count', + [ + ('sqlite', 0), + ('postgres', 1), + ('mysql', 1), + ], +) +def test_apply_query_timeouts(settings, engine, p_count): + if settings.DB_ENGINE != engine: + return + + settings.CQRS['replica']['CQRS_QUERY_TIMEOUT'] = 1 + with patch('dj_cqrs.utils.install_last_query_capturer') as p: + assert apply_query_timeouts(models.BasicFieldsModelRef) is None + + assert p.call_count == p_count