From bcd0d2034d3db27ef30d027491d856981a82d0f9 Mon Sep 17 00:00:00 2001 From: Marina Samuel Date: Fri, 8 Jun 2018 13:23:48 -0400 Subject: [PATCH] Closes #415: Make data source health monitoring an extension. --- redash/monitor.py | 3 - redash/query_runner/__init__.py | 6 +- redash/settings/__init__.py | 13 +-- redash/settings/helpers.py | 6 -- redash/tasks/__init__.py | 1 - redash/tasks/health.py | 59 -------------- redash/worker.py | 11 ++- requirements.txt | 2 +- tests/tasks/test_health.py | 136 -------------------------------- 9 files changed, 11 insertions(+), 226 deletions(-) delete mode 100644 redash/tasks/health.py delete mode 100644 tests/tasks/test_health.py diff --git a/redash/monitor.py b/redash/monitor.py index ced9b42920..12840b608e 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -1,4 +1,3 @@ -import json from redash import redis_connection, models, __version__, settings @@ -15,10 +14,8 @@ def get_object_counts(): status['unused_query_results_count'] = models.QueryResult.unused().count() status['dashboards_count'] = models.Dashboard.query.count() status['widgets_count'] = models.Widget.query.count() - status['data_sources'] = json.loads(redis_connection.get('data_sources:health') or '{}') return status - def get_queues(): queues = {} for ds in models.DataSource.query: diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index e88ec28ddf..2851587f78 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -102,12 +102,10 @@ def get_data_source_version(self): return version - def test_connection(self, custom_query_text=None): + def test_connection(self): if self.noop_query is None: raise NotImplementedError() - - query_text = custom_query_text or self.noop_query - data, error = self.run_query(query_text, None) + data, error = self.run_query(self.noop_query, None) if error is not None: raise Exception(error) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index b7b88c64d2..d2ad704802 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -1,7 +1,7 @@ import os from funcy import distinct, remove -from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string, dict_from_string +from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string def all_settings(): @@ -242,14 +242,3 @@ def all_settings(): # Allow Parameters in Embeds # WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!) ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false")) - -# Allow for a map of custom queries to test data source performance and availability. -# A sample map may look like: -# { -# "1": "select 1;", -# "5": "select 1;" -# } -CUSTOM_HEALTH_QUERIES = dict_from_string(os.environ.get("REDASH_CUSTOM_HEALTH_QUERIES", "")) - -# Frequency of health query runs in minutes (12 hours by default) -HEALTH_QUERIES_REFRESH_SCHEDULE = int(os.environ.get("REDASH_HEALTH_QUERIES_REFRESH_SCHEDULE", 720)) diff --git a/redash/settings/helpers.py b/redash/settings/helpers.py index cfe69bd38d..e55d61001d 100644 --- a/redash/settings/helpers.py +++ b/redash/settings/helpers.py @@ -33,12 +33,6 @@ def array_from_string(s): return [item.strip() for item in array] -def dict_from_string(s): - try: - return json.loads(s) - except ValueError: - return {} - def set_from_string(s): return set(array_from_string(s)) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 05d51aae50..f242e4c516 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,4 +1,3 @@ from .general import record_event, version_check, send_mail -from .health import health_status from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query from .alerts import check_alerts_for_query \ No newline at end of file diff --git a/redash/tasks/health.py b/redash/tasks/health.py deleted file mode 100644 index 2502b32ddf..0000000000 --- a/redash/tasks/health.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import time -from random import randint - -from celery.utils.log import get_task_logger -from redash import models, redis_connection, settings, statsd_client -from redash.worker import celery -from redash.utils import parse_human_time - -logger = get_task_logger(__name__) - - -def update_health_status(data_source_id, data_source_name, query_text, data): - key = "data_sources:health" - - cache = json.loads(redis_connection.get(key) or '{}') - if data_source_id not in cache: - cache[data_source_id] = { - "metadata": { "name": data_source_name }, - "queries": {} - } - cache[data_source_id]["queries"][query_text] = data - - cache[data_source_id]["status"] = "SUCCESS" - for query_status in cache[data_source_id]["queries"].values(): - if query_status["status"] == "FAIL": - cache[data_source_id]["status"] = "FAIL" - break - - redis_connection.set(key, json.dumps(cache)) - -@celery.task(name="redash.tasks.health_status", time_limit=90, soft_time_limit=60) -def health_status(): - for ds in models.DataSource.query: - logger.info(u"task=health_status state=start ds_id=%s", ds.id) - - runtime = None - query_text = ds.query_runner.noop_query - custom_queries = settings.CUSTOM_HEALTH_QUERIES - ds_id = str(ds.id) - - if custom_queries and ds_id in custom_queries: - query_text = custom_queries[ds_id] - - try: - start_time = time.time() - ds.query_runner.test_connection(query_text) - runtime = time.time() - start_time - except Exception as e: - logger.warning(u"Failed health check for the data source: %s", ds.name, exc_info=1) - statsd_client.incr('health_status.error') - logger.info(u"task=health_status state=error ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - - update_health_status(ds_id, ds.name, query_text, { - "status": "SUCCESS" if runtime is not None else "FAIL", - "last_run": start_time, - "last_run_human": str(parse_human_time(str(start_time))), - "runtime": runtime - }) diff --git a/redash/worker.py b/redash/worker.py index 668da00735..d2add5a218 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -16,10 +16,6 @@ include='redash.tasks') celery_schedule = { - 'health_status': { - 'task': 'redash.tasks.health_status', - 'schedule': timedelta(minutes=settings.HEALTH_QUERIES_REFRESH_SCHEDULE) - }, 'refresh_queries': { 'task': 'redash.tasks.refresh_queries', 'schedule': timedelta(seconds=30) @@ -82,3 +78,10 @@ def __call__(self, *args, **kwargs): def init_celery_flask_app(**kwargs): app = create_app() app.app_context().push() + +@celery.on_after_configure.connect +def add_periodic_tasks(sender, **kwargs): + app = create_app() + periodic_tasks = getattr(app, 'periodic_tasks', {}) + for params in periodic_tasks.values(): + sender.add_periodic_task(**params) diff --git a/requirements.txt b/requirements.txt index 49e33b71e8..ca5229e88a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -54,4 +54,4 @@ disposable-email-domains # Uncomment the requirement for ldap3 if using ldap. # It is not included by default because of the GPL license conflict. # ldap3==2.2.4 -redash-stmo>=2018.4.0 +redash-stmo>=2018.8.1 diff --git a/tests/tasks/test_health.py b/tests/tasks/test_health.py deleted file mode 100644 index 37e4f9ea7d..0000000000 --- a/tests/tasks/test_health.py +++ /dev/null @@ -1,136 +0,0 @@ -import json -import mock -from tests import BaseTestCase - -from redash import redis_connection -from redash.tasks.health import update_health_status, health_status - - -class TestHealthStatus(BaseTestCase): - def setUp(self): - super(TestHealthStatus, self).setUp() - self.patched_custom_queries = self._setup_mock('redash.tasks.health.settings') - self.patched_updated_health_status = self._setup_mock('redash.tasks.health.update_health_status') - self.patched_run_query = self._setup_mock('redash.query_runner.pg.PostgreSQL.run_query') - - self.patched_run_query.return_value = ("some_data", None) - self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = "" - - def _setup_mock(self, function_to_patch): - patcher = mock.patch(function_to_patch) - patched_function = patcher.start() - self.addCleanup(patcher.stop) - return patched_function - - def test_update_health_status_sets_correct_keys(self): - current_health = redis_connection.get('data_sources:health') - self.assertEqual(None, current_health) - - DATA_SOURCE = self.factory.create_data_source() - QUERY_SUCCESS = "SELECT 1" - QUERY_FAIL = "SELECT meep" - SOME_DATA_FAIL = {"a": "b", "foo": "bar", "status": "FAIL"} - SOME_DATA_SUCCESS = {"a": "b", "foo": "bar", "status": "SUCCESS"} - update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_FAIL, SOME_DATA_FAIL) - update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_SUCCESS, SOME_DATA_SUCCESS) - - ''' - The expected format of the cached health data is: - { - "": { - "metadata": "", - "queries": { - "": {...}, - "": {...}, - "": {...}, - ... - } - }, - ... - } - ''' - current_health = json.loads(redis_connection.get('data_sources:health')) - - # There is 1 data source. - self.assertEqual(1, len(current_health.keys())) - self.assertEqual(DATA_SOURCE.id, int(current_health.keys()[0])) - - # The data source has "metadata", "queries" and "status" keys. - ds_id = str(DATA_SOURCE.id) - self.assertEqual(3, len(current_health[ds_id].keys())) - self.assertTrue("metadata" in current_health[ds_id].keys()) - self.assertTrue("queries" in current_health[ds_id].keys()) - self.assertTrue("status" in current_health[ds_id].keys()) - - # There are two queries with correct data - self.assertEqual(2, len(current_health[ds_id]["queries"])) - self.assertTrue(QUERY_SUCCESS in current_health[ds_id]["queries"].keys()) - self.assertTrue(QUERY_FAIL in current_health[ds_id]["queries"].keys()) - self.assertEqual(SOME_DATA_FAIL, current_health[ds_id]["queries"][QUERY_FAIL]) - self.assertEqual(SOME_DATA_SUCCESS, current_health[ds_id]["queries"][QUERY_SUCCESS]) - self.assertEqual(SOME_DATA_FAIL["status"], current_health[ds_id]["status"]) - - def test_health_status_success(self): - data_sources = [] - for i in range(5): - data_sources.append(self.factory.create_data_source()) - - health_status() - - # Status is updated for each of the 5 data sources - self.assertEqual(self.patched_updated_health_status.call_count, 5) - - # The data source name and id is correctly passed in the last call of update_health_status() - args, kwargs = self.patched_updated_health_status.call_args - self.assertEqual(str(data_sources[-1].id), args[0]) - self.assertEqual(data_sources[-1].name, args[1]) - - # All expected status keys are available. - EXPECTED_KEYS = ["status", "last_run", "last_run_human", "runtime"] - NEW_STATUS = args[3] - new_status_keys = set(NEW_STATUS.keys()) - self.assertEqual(set(EXPECTED_KEYS), new_status_keys) - - self.assertEqual("SUCCESS", NEW_STATUS["status"]) - for key in EXPECTED_KEYS[1:]: - self.assertIsNotNone(NEW_STATUS[key]) - - def test_health_status_run_query_throws_exception(self): - data_source = self.factory.create_data_source() - - def exception_raiser(*args, **kwargs): - raise Exception - - self.patched_run_query.side_effect = exception_raiser - health_status() - - # Status is updated for the one data source - self.assertEqual(self.patched_updated_health_status.call_count, 1) - - # The data source name is correctly passed in the last call of update_health_status() - args, kwargs = self.patched_updated_health_status.call_args - self.assertEqual(str(data_source.id), args[0]) - self.assertEqual(data_source.name, args[1]) - self.assertEqual(data_source.query_runner.noop_query, args[2]) - - # All expected status keys are available. - EXPECTED_KEYS = ['status', 'last_run', 'last_run_human', 'runtime'] - NEW_STATUS = args[3] - new_status_keys = set(NEW_STATUS.keys()) - self.assertEqual(set(EXPECTED_KEYS), new_status_keys) - - self.assertEqual('FAIL', NEW_STATUS['status']) - self.assertIsNotNone(NEW_STATUS['last_run']) - self.assertIsNotNone(NEW_STATUS['last_run_human']) - self.assertIsNone(NEW_STATUS['runtime']) - - def test_health_status_custom_query(self): - CUSTOM_QUERY = "select * from table" - data_source = self.factory.create_data_source() - self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = {"1": CUSTOM_QUERY} - - health_status() - - args, kwargs = self.patched_updated_health_status.call_args - self.assertNotEqual(data_source.query_runner.noop_query, args[2]) - self.assertEqual(CUSTOM_QUERY, args[2])