From 3fa19c3d659eea9e58e835013c7cae81b5acff49 Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Wed, 4 Sep 2019 15:12:17 +0300 Subject: [PATCH 1/4] Add interface to implement custom persistence for QueryResult data Co-authored-by: Omer Lachish --- redash/models/__init__.py | 52 +++++++++++++++++-------- redash/settings/dynamic_settings.py | 4 ++ redash/tasks/queries.py | 6 ++- tests/models/test_queries.py | 60 ++++++++++++++++++++++++++++- tests/test_models.py | 43 +-------------------- 5 files changed, 106 insertions(+), 59 deletions(-) diff --git a/redash/models/__init__.py b/redash/models/__init__.py index f49101fa7e..0154897c23 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -232,9 +232,21 @@ class DataSourceGroup(db.Model): __tablename__ = "data_source_groups" +class DBPersistence(object): + @property + def data(self): + return self._data + + @data.setter + def data(self, data): + self._data = data + + +QueryResultPersistence = settings.dynamic_settings.QueryResultPersistence or DBPersistence + @python_2_unicode_compatible @generic_repr('id', 'org_id', 'data_source_id', 'query_hash', 'runtime', 'retrieved_at') -class QueryResult(db.Model, BelongsToOrgMixin): +class QueryResult(db.Model, BelongsToOrgMixin, QueryResultPersistence): id = Column(db.Integer, primary_key=True) org_id = Column(db.Integer, db.ForeignKey('organizations.id')) org = db.relationship(Organization) @@ -242,7 +254,7 @@ class QueryResult(db.Model, BelongsToOrgMixin): data_source = db.relationship(DataSource, backref=backref('query_results')) query_hash = Column(db.String(32), index=True) query_text = Column('query', db.Text) - data = Column(db.Text) + _data = Column('data', db.Text) runtime = Column(postgresql.DOUBLE_PRECISION) retrieved_at = Column(db.DateTime(True)) @@ -304,22 +316,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri data_source=data_source, retrieved_at=retrieved_at, data=data) + + db.session.add(query_result) logging.info("Inserted query (%s) data; id=%s", query_hash, query_result.id) - # TODO: Investigate how big an impact this select-before-update makes. - queries = Query.query.filter( - Query.query_hash == query_hash, - Query.data_source == data_source - ) - for q in queries: - q.latest_query_data = query_result - # don't auto-update the updated_at timestamp - q.skip_updated_at = True - db.session.add(q) - query_ids = [q.id for q in queries] - logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash) - return query_result, query_ids + return query_result @property def groups(self): @@ -639,6 +641,26 @@ def all_groups_for_query_ids(cls, query_ids): WHERE queries.id in :ids""" return db.session.execute(query, {'ids': tuple(query_ids)}).fetchall() + + @classmethod + def update_latest_result(cls, query_result): + # TODO: Investigate how big an impact this select-before-update makes. + queries = Query.query.filter( + Query.query_hash == query_result.query_hash, + Query.data_source == query_result.data_source + ) + + for q in queries: + q.latest_query_data = query_result + # don't auto-update the updated_at timestamp + q.skip_updated_at = True + db.session.add(q) + + query_ids = [q.id for q in queries] + logging.info("Updated %s queries with result (%s).", len(query_ids), query_result.query_hash) + + return query_ids + def fork(self, user): forked_list = ['org', 'data_source', 'latest_query_data', 'description', diff --git a/redash/settings/dynamic_settings.py b/redash/settings/dynamic_settings.py index c1858d0a91..eec0e224fe 100644 --- a/redash/settings/dynamic_settings.py +++ b/redash/settings/dynamic_settings.py @@ -10,6 +10,10 @@ def query_time_limit(is_scheduled, user_id, org_id): return scheduled_time_limit if is_scheduled else adhoc_time_limit +# This provides the ability to override the way we store QueryResult's data column. +# Reference implementation: redash.models.DBPersistence +QueryResultPersistence = None + # Provide any custom tasks you'd like to run periodically def custom_tasks(): return { diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 0ee9cd0ab0..69fbe16ecb 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -385,10 +385,14 @@ def run(self): self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False) self.scheduled_query.schedule_failures = 0 models.db.session.add(self.scheduled_query) - query_result, updated_query_ids = models.QueryResult.store_result( + + query_result = models.QueryResult.store_result( self.data_source.org_id, self.data_source, self.query_hash, self.query, data, run_time, utcnow()) + + updated_query_ids = models.Query.update_latest_result(query_result) + models.db.session.commit() # make sure that alert sees the latest query result self._log_progress('checking_alerts') for query_id in updated_query_ids: diff --git a/tests/models/test_queries.py b/tests/models/test_queries.py index a73caa32d4..f1c45b7bf1 100644 --- a/tests/models/test_queries.py +++ b/tests/models/test_queries.py @@ -2,8 +2,8 @@ from tests import BaseTestCase import datetime -from redash.models import Query, Group, Event, db -from redash.utils import utcnow +from redash.models import Query, QueryResult, Group, Event, db +from redash.utils import utcnow, gen_query_hash import mock @@ -375,3 +375,59 @@ def test_fork_from_query_that_has_no_visualization(self): self.assertEqual(count_table, 1) self.assertEqual(count_vis, 1) + + +class TestQueryUpdateLatestResult(BaseTestCase): + def setUp(self): + super(TestQueryUpdateLatestResult, self).setUp() + self.data_source = self.factory.data_source + self.query = "SELECT 1" + self.query_hash = gen_query_hash(self.query) + self.runtime = 123 + self.utcnow = utcnow() + self.data = "data" + + def test_updates_existing_queries(self): + query1 = self.factory.create_query(query_text=self.query) + query2 = self.factory.create_query(query_text=self.query) + query3 = self.factory.create_query(query_text=self.query) + + query_result = QueryResult.store_result( + self.data_source.org_id, self.data_source, self.query_hash, + self.query, self.data, self.runtime, self.utcnow) + + Query.update_latest_result(query_result) + + self.assertEqual(query1.latest_query_data, query_result) + self.assertEqual(query2.latest_query_data, query_result) + self.assertEqual(query3.latest_query_data, query_result) + + def test_doesnt_update_queries_with_different_hash(self): + query1 = self.factory.create_query(query_text=self.query) + query2 = self.factory.create_query(query_text=self.query) + query3 = self.factory.create_query(query_text=self.query + "123") + + query_result = QueryResult.store_result( + self.data_source.org_id, self.data_source, self.query_hash, + self.query, self.data, self.runtime, self.utcnow) + + Query.update_latest_result(query_result) + + self.assertEqual(query1.latest_query_data, query_result) + self.assertEqual(query2.latest_query_data, query_result) + self.assertNotEqual(query3.latest_query_data, query_result) + + def test_doesnt_update_queries_with_different_data_source(self): + query1 = self.factory.create_query(query_text=self.query) + query2 = self.factory.create_query(query_text=self.query) + query3 = self.factory.create_query(query_text=self.query, data_source=self.factory.create_data_source()) + + query_result = QueryResult.store_result( + self.data_source.org_id, self.data_source, self.query_hash, + self.query, self.data, self.runtime, self.utcnow) + + Query.update_latest_result(query_result) + + self.assertEqual(query1.latest_query_data, query_result) + self.assertEqual(query2.latest_query_data, query_result) + self.assertNotEqual(query3.latest_query_data, query_result) \ No newline at end of file diff --git a/tests/test_models.py b/tests/test_models.py index 3e222e0d66..611ccd8ac8 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -299,7 +299,7 @@ def test_archive_query_sets_flag(self): def test_archived_query_doesnt_return_in_all(self): query = self.factory.create_query(schedule={'interval':'1', 'until':None, 'time': None, 'day_of_week':None}) yesterday = utcnow() - datetime.timedelta(days=1) - query_result, _ = models.QueryResult.store_result( + query_result = models.QueryResult.store_result( query.org_id, query.data_source, query.query_hash, query.query_text, "1", 123, yesterday) @@ -438,7 +438,7 @@ def setUp(self): self.data = "data" def test_stores_the_result(self): - query_result, _ = models.QueryResult.store_result( + query_result = models.QueryResult.store_result( self.data_source.org_id, self.data_source, self.query_hash, self.query, self.data, self.runtime, self.utcnow) @@ -449,45 +449,6 @@ def test_stores_the_result(self): self.assertEqual(query_result.query_hash, self.query_hash) self.assertEqual(query_result.data_source, self.data_source) - def test_updates_existing_queries(self): - query1 = self.factory.create_query(query_text=self.query) - query2 = self.factory.create_query(query_text=self.query) - query3 = self.factory.create_query(query_text=self.query) - - query_result, _ = models.QueryResult.store_result( - self.data_source.org_id, self.data_source, self.query_hash, - self.query, self.data, self.runtime, self.utcnow) - - self.assertEqual(query1.latest_query_data, query_result) - self.assertEqual(query2.latest_query_data, query_result) - self.assertEqual(query3.latest_query_data, query_result) - - def test_doesnt_update_queries_with_different_hash(self): - query1 = self.factory.create_query(query_text=self.query) - query2 = self.factory.create_query(query_text=self.query) - query3 = self.factory.create_query(query_text=self.query + "123") - - query_result, _ = models.QueryResult.store_result( - self.data_source.org_id, self.data_source, self.query_hash, - self.query, self.data, self.runtime, self.utcnow) - - self.assertEqual(query1.latest_query_data, query_result) - self.assertEqual(query2.latest_query_data, query_result) - self.assertNotEqual(query3.latest_query_data, query_result) - - def test_doesnt_update_queries_with_different_data_source(self): - query1 = self.factory.create_query(query_text=self.query) - query2 = self.factory.create_query(query_text=self.query) - query3 = self.factory.create_query(query_text=self.query, data_source=self.factory.create_data_source()) - - query_result, _ = models.QueryResult.store_result( - self.data_source.org_id, self.data_source, self.query_hash, - self.query, self.data, self.runtime, self.utcnow) - - self.assertEqual(query1.latest_query_data, query_result) - self.assertEqual(query2.latest_query_data, query_result) - self.assertNotEqual(query3.latest_query_data, query_result) - class TestEvents(BaseTestCase): def raw_event(self): From fda74a2d1d4ce897afc2ce2af5ee7b148bcd913d Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Mon, 16 Sep 2019 12:49:18 +0300 Subject: [PATCH 2/4] Deserialize query results data in the model --- redash/models/__init__.py | 24 +++++++++++++++--------- redash/models/parameterized_query.py | 2 +- redash/query_runner/python.py | 2 +- redash/serializers/query_result.py | 4 ++-- tests/tasks/test_queries.py | 6 ++++-- tests/test_models.py | 4 ++-- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/redash/models/__init__.py b/redash/models/__init__.py index 0154897c23..009d561b16 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -235,7 +235,13 @@ class DataSourceGroup(db.Model): class DBPersistence(object): @property def data(self): - return self._data + if self._data is None: + return None + + if not hasattr(self, '_deserialized_data'): + self._deserialized_data = json_loads(self._data) + + return self._deserialized_data @data.setter def data(self, data): @@ -268,7 +274,7 @@ def to_dict(self): 'id': self.id, 'query_hash': self.query_hash, 'query': self.query_text, - 'data': json_loads(self.data), + 'data': self.data, 'data_source_id': self.data_source_id, 'runtime': self.runtime, 'retrieved_at': self.retrieved_at @@ -316,12 +322,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri data_source=data_source, retrieved_at=retrieved_at, data=data) - - + + db.session.add(query_result) logging.info("Inserted query (%s) data; id=%s", query_hash, query_result.id) - return query_result + return query_result @property def groups(self): @@ -641,7 +647,7 @@ def all_groups_for_query_ids(cls, query_ids): WHERE queries.id in :ids""" return db.session.execute(query, {'ids': tuple(query_ids)}).fetchall() - + @classmethod def update_latest_result(cls, query_result): # TODO: Investigate how big an impact this select-before-update makes. @@ -658,7 +664,7 @@ def update_latest_result(cls, query_result): query_ids = [q.id for q in queries] logging.info("Updated %s queries with result (%s).", len(query_ids), query_result.query_hash) - + return query_ids @@ -810,7 +816,7 @@ def get_by_id_and_org(cls, object_id, org): return super(Alert, cls).get_by_id_and_org(object_id, org, Query) def evaluate(self): - data = json_loads(self.query_rel.latest_query_data.data) + data = self.query_rel.latest_query_data.data if data['rows'] and self.options['column'] in data['rows'][0]: operators = { @@ -847,7 +853,7 @@ def render_template(self, template): if template is None: return '' - data = json_loads(self.query_rel.latest_query_data.data) + data = self.query_rel.latest_query_data.data host = base_url(self.query_rel.org) col_name = self.options['column'] diff --git a/redash/models/parameterized_query.py b/redash/models/parameterized_query.py index c42eeaf581..2002e4b7a2 100644 --- a/redash/models/parameterized_query.py +++ b/redash/models/parameterized_query.py @@ -24,7 +24,7 @@ def _load_result(query_id, org): if query.data_source: query_result = models.QueryResult.get_by_id_and_org(query.latest_query_data_id, org) - return json_loads(query_result.data) + return query_result.data else: raise QueryDetachedFromDataSourceError(query_id) diff --git a/redash/query_runner/python.py b/redash/query_runner/python.py index 8a516965d3..972b603add 100644 --- a/redash/query_runner/python.py +++ b/redash/query_runner/python.py @@ -207,7 +207,7 @@ def get_query_result(query_id): if query.latest_query_data.data is None: raise Exception("Query does not have results yet.") - return json_loads(query.latest_query_data.data) + return query.latest_query_data.data def get_current_user(self): return self._current_user.to_dict() diff --git a/redash/serializers/query_result.py b/redash/serializers/query_result.py index ff737e3a28..4cc8e827c0 100644 --- a/redash/serializers/query_result.py +++ b/redash/serializers/query_result.py @@ -68,7 +68,7 @@ def serialize_query_result(query_result, is_api_user): def serialize_query_result_to_csv(query_result): s = cStringIO.StringIO() - query_data = json_loads(query_result.data) + query_data = query_result.data fieldnames, special_columns = _get_column_lists(query_data['columns'] or []) @@ -89,7 +89,7 @@ def serialize_query_result_to_csv(query_result): def serialize_query_result_to_xlsx(query_result): s = cStringIO.StringIO() - query_data = json_loads(query_result.data) + query_data = query_result.data book = xlsxwriter.Workbook(s, {'constant_memory': True}) sheet = book.add_worksheet("result") diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index d542c3991e..2afb962de6 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -6,6 +6,7 @@ from tests import BaseTestCase from redash import redis_connection, models +from redash.utils import json_dumps from redash.query_runner.pg import PostgreSQL from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query @@ -57,11 +58,12 @@ def test_success(self): """ cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'}) with cm, mock.patch.object(PostgreSQL, "run_query") as qr: - qr.return_value = ([1, 2], None) + query_result_data = {"columns": [], "rows": []} + qr.return_value = (json_dumps(query_result_data), None) result_id = execute_query("SELECT 1, 2", self.factory.data_source.id, {}) self.assertEqual(1, qr.call_count) result = models.QueryResult.query.get(result_id) - self.assertEqual(result.data, '{1,2}') + self.assertEqual(result.data, query_result_data) def test_success_scheduled(self): """ diff --git a/tests/test_models.py b/tests/test_models.py index 611ccd8ac8..8ca46f0819 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -435,14 +435,14 @@ def setUp(self): self.query_hash = gen_query_hash(self.query) self.runtime = 123 self.utcnow = utcnow() - self.data = "data" + self.data = '{"a": 1}' def test_stores_the_result(self): query_result = models.QueryResult.store_result( self.data_source.org_id, self.data_source, self.query_hash, self.query, self.data, self.runtime, self.utcnow) - self.assertEqual(query_result.data, self.data) + self.assertEqual(query_result._data, self.data) self.assertEqual(query_result.runtime, self.runtime) self.assertEqual(query_result.retrieved_at, self.utcnow) self.assertEqual(query_result.query_text, self.query) From 068b9ee7e62ae7dab5a5917d1ff10115b61ef77d Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Sun, 22 Sep 2019 09:27:24 +0300 Subject: [PATCH 3/4] Change order of mixins. --- redash/models/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redash/models/__init__.py b/redash/models/__init__.py index 009d561b16..b241f5fab2 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -252,7 +252,7 @@ def data(self, data): @python_2_unicode_compatible @generic_repr('id', 'org_id', 'data_source_id', 'query_hash', 'runtime', 'retrieved_at') -class QueryResult(db.Model, BelongsToOrgMixin, QueryResultPersistence): +class QueryResult(db.Model, QueryResultPersistence, BelongsToOrgMixin): id = Column(db.Integer, primary_key=True) org_id = Column(db.Integer, db.ForeignKey('organizations.id')) org = db.relationship(Organization) From 32de5dddef6b3d5f692724058a3b8c259a7b5523 Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Mon, 7 Oct 2019 18:45:29 +0300 Subject: [PATCH 4/4] Make DBPersistence.data setter in sycn with getter + tests --- redash/models/__init__.py | 8 ++++++-- tests/models/test_query_results.py | 24 +++++++++++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/redash/models/__init__.py b/redash/models/__init__.py index b241f5fab2..25744d264f 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -232,19 +232,23 @@ class DataSourceGroup(db.Model): __tablename__ = "data_source_groups" +DESERIALIZED_DATA_ATTR = '_deserialized_data' + class DBPersistence(object): @property def data(self): if self._data is None: return None - if not hasattr(self, '_deserialized_data'): - self._deserialized_data = json_loads(self._data) + if not hasattr(self, DESERIALIZED_DATA_ATTR): + setattr(self, DESERIALIZED_DATA_ATTR, json_loads(self._data)) return self._deserialized_data @data.setter def data(self, data): + if hasattr(self, DESERIALIZED_DATA_ATTR): + delattr(self, DESERIALIZED_DATA_ATTR) self._data = data diff --git a/tests/models/test_query_results.py b/tests/models/test_query_results.py index 751cbf95a3..5608c23913 100644 --- a/tests/models/test_query_results.py +++ b/tests/models/test_query_results.py @@ -1,9 +1,12 @@ #encoding: utf8 import datetime +from unittest import TestCase from tests import BaseTestCase +from mock import patch from redash import models +from redash.models import DBPersistence from redash.utils import utcnow, json_dumps @@ -66,4 +69,23 @@ def test_store_result_does_not_modify_query_update_at(self): models.QueryResult.store_result(query.org_id, query.data_source, query.query_hash, query.query_text, "", 0, utcnow()) - self.assertEqual(original_updated_at, query.updated_at) \ No newline at end of file + self.assertEqual(original_updated_at, query.updated_at) + + +class TestDBPersistence(TestCase): + def test_updating_data_removes_cached_result(self): + p = DBPersistence() + p.data = '{"test": 1}' + self.assertDictEqual(p.data, {"test": 1}) + p.data = '{"test": 2}' + self.assertDictEqual(p.data, {"test": 2}) + + @patch('redash.models.json_loads') + def test_calls_json_loads_only_once(self, json_loads_patch): + json_loads_patch.return_value = '1' + p = DBPersistence() + json_data = '{"test": 1}' + p.data = json_data + a = p.data + b = p.data + json_loads_patch.assert_called_once_with(json_data) \ No newline at end of file