Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interface to abstract query result persistence #4147

Merged
merged 4 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,39 @@ class DataSourceGroup(db.Model):
__tablename__ = "data_source_groups"


DESERIALIZED_DATA_ATTR = '_deserialized_data'

class DBPersistence(object):
@property
def data(self):
if self._data is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also check if self._deserialized_data has already been set and return it if yes (and also unset it in the property setter below)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case _data will be None and _deserialized_data has some value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically a race condition when _data is set to None via the data property setter below when it was previously not None. If data should just be used as a read-only property I would suggest to remove the property setter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. It feels like this complicates the code for no good reason, as this class has a very defined use 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read your comment again and realized that actually making _deserialized_data in sync with the setter isn't that complex. Implemented in 16fbc5e + added tests.

return None

if not hasattr(self, DESERIALIZED_DATA_ATTR):
setattr(self, DESERIALIZED_DATA_ATTR, json_loads(self._data))

return self._deserialized_data

@data.setter
def data(self, data):
if hasattr(self, DESERIALIZED_DATA_ATTR):
delattr(self, DESERIALIZED_DATA_ATTR)
self._data = data


QueryResultPersistence = settings.dynamic_settings.QueryResultPersistence or DBPersistence

@python_2_unicode_compatible
@generic_repr('id', 'org_id', 'data_source_id', 'query_hash', 'runtime', 'retrieved_at')
class QueryResult(db.Model, BelongsToOrgMixin):
class QueryResult(db.Model, QueryResultPersistence, BelongsToOrgMixin):
id = Column(db.Integer, primary_key=True)
org_id = Column(db.Integer, db.ForeignKey('organizations.id'))
org = db.relationship(Organization)
data_source_id = Column(db.Integer, db.ForeignKey("data_sources.id"))
data_source = db.relationship(DataSource, backref=backref('query_results'))
query_hash = Column(db.String(32), index=True)
query_text = Column('query', db.Text)
data = Column(db.Text)
_data = Column('data', db.Text)
runtime = Column(postgresql.DOUBLE_PRECISION)
retrieved_at = Column(db.DateTime(True))

Expand All @@ -256,7 +278,7 @@ def to_dict(self):
'id': self.id,
'query_hash': self.query_hash,
'query': self.query_text,
'data': json_loads(self.data),
'data': self.data,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now deserialize it in a single location instead of having every user call json_loads.

'data_source_id': self.data_source_id,
'runtime': self.runtime,
'retrieved_at': self.retrieved_at
Expand Down Expand Up @@ -304,22 +326,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
data_source=data_source,
retrieved_at=retrieved_at,
data=data)


db.session.add(query_result)
logging.info("Inserted query (%s) data; id=%s", query_hash, query_result.id)
# TODO: Investigate how big an impact this select-before-update makes.
queries = Query.query.filter(
Query.query_hash == query_hash,
Query.data_source == data_source
)
for q in queries:
q.latest_query_data = query_result
# don't auto-update the updated_at timestamp
q.skip_updated_at = True
db.session.add(q)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We moved this to a separate class method on the Query model. Eventually wasn't needed for this refactor, but it's a good separation regardless so we kept it here.


return query_result, query_ids
return query_result

@property
def groups(self):
Expand Down Expand Up @@ -640,6 +652,26 @@ def all_groups_for_query_ids(cls, query_ids):

return db.session.execute(query, {'ids': tuple(query_ids)}).fetchall()

@classmethod
def update_latest_result(cls, query_result):
# TODO: Investigate how big an impact this select-before-update makes.
queries = Query.query.filter(
Query.query_hash == query_result.query_hash,
Query.data_source == query_result.data_source
)
jezdez marked this conversation as resolved.
Show resolved Hide resolved

for q in queries:
q.latest_query_data = query_result
# don't auto-update the updated_at timestamp
q.skip_updated_at = True
db.session.add(q)

query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_result.query_hash)

return query_ids


def fork(self, user):
forked_list = ['org', 'data_source', 'latest_query_data', 'description',
'query_text', 'query_hash', 'options']
Expand Down Expand Up @@ -788,7 +820,7 @@ def get_by_id_and_org(cls, object_id, org):
return super(Alert, cls).get_by_id_and_org(object_id, org, Query)

def evaluate(self):
data = json_loads(self.query_rel.latest_query_data.data)
data = self.query_rel.latest_query_data.data

if data['rows'] and self.options['column'] in data['rows'][0]:
operators = {
Expand Down Expand Up @@ -825,7 +857,7 @@ def render_template(self, template):
if template is None:
return ''

data = json_loads(self.query_rel.latest_query_data.data)
data = self.query_rel.latest_query_data.data
host = base_url(self.query_rel.org)

col_name = self.options['column']
Expand Down
2 changes: 1 addition & 1 deletion redash/models/parameterized_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _load_result(query_id, org):

if query.data_source:
query_result = models.QueryResult.get_by_id_and_org(query.latest_query_data_id, org)
return json_loads(query_result.data)
return query_result.data
else:
raise QueryDetachedFromDataSourceError(query_id)

Expand Down
2 changes: 1 addition & 1 deletion redash/query_runner/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def get_query_result(query_id):
if query.latest_query_data.data is None:
raise Exception("Query does not have results yet.")

return json_loads(query.latest_query_data.data)
return query.latest_query_data.data

def get_current_user(self):
return self._current_user.to_dict()
Expand Down
4 changes: 2 additions & 2 deletions redash/serializers/query_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def serialize_query_result(query_result, is_api_user):
def serialize_query_result_to_csv(query_result):
s = cStringIO.StringIO()

query_data = json_loads(query_result.data)
query_data = query_result.data

fieldnames, special_columns = _get_column_lists(query_data['columns'] or [])

Expand All @@ -89,7 +89,7 @@ def serialize_query_result_to_csv(query_result):
def serialize_query_result_to_xlsx(query_result):
s = cStringIO.StringIO()

query_data = json_loads(query_result.data)
query_data = query_result.data
book = xlsxwriter.Workbook(s, {'constant_memory': True})
sheet = book.add_worksheet("result")

Expand Down
4 changes: 4 additions & 0 deletions redash/settings/dynamic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def query_time_limit(is_scheduled, user_id, org_id):
return scheduled_time_limit if is_scheduled else adhoc_time_limit


# This provides the ability to override the way we store QueryResult's data column.
# Reference implementation: redash.models.DBPersistence
QueryResultPersistence = None

# Provide any custom tasks you'd like to run periodically
def custom_tasks():
return {
Expand Down
6 changes: 5 additions & 1 deletion redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,14 @@ def run(self):
self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False)
self.scheduled_query.schedule_failures = 0
models.db.session.add(self.scheduled_query)
query_result, updated_query_ids = models.QueryResult.store_result(

query_result = models.QueryResult.store_result(
self.data_source.org_id, self.data_source,
self.query_hash, self.query, data,
run_time, utcnow())

updated_query_ids = models.Query.update_latest_result(query_result)

models.db.session.commit() # make sure that alert sees the latest query result
self._log_progress('checking_alerts')
for query_id in updated_query_ids:
Expand Down
60 changes: 58 additions & 2 deletions tests/models/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from tests import BaseTestCase
import datetime
from redash.models import Query, Group, Event, db
from redash.utils import utcnow
from redash.models import Query, QueryResult, Group, Event, db
from redash.utils import utcnow, gen_query_hash
import mock


Expand Down Expand Up @@ -375,3 +375,59 @@ def test_fork_from_query_that_has_no_visualization(self):

self.assertEqual(count_table, 1)
self.assertEqual(count_vis, 1)


class TestQueryUpdateLatestResult(BaseTestCase):
def setUp(self):
super(TestQueryUpdateLatestResult, self).setUp()
self.data_source = self.factory.data_source
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
self.runtime = 123
self.utcnow = utcnow()
self.data = "data"

def test_updates_existing_queries(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query)

query_result = QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

Query.update_latest_result(query_result)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertEqual(query3.latest_query_data, query_result)

def test_doesnt_update_queries_with_different_hash(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query + "123")

query_result = QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

Query.update_latest_result(query_result)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertNotEqual(query3.latest_query_data, query_result)

def test_doesnt_update_queries_with_different_data_source(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query, data_source=self.factory.create_data_source())

query_result = QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

Query.update_latest_result(query_result)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertNotEqual(query3.latest_query_data, query_result)
24 changes: 23 additions & 1 deletion tests/models/test_query_results.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#encoding: utf8
import datetime

from unittest import TestCase
from tests import BaseTestCase
from mock import patch

from redash import models
from redash.models import DBPersistence
from redash.utils import utcnow, json_dumps


Expand Down Expand Up @@ -66,4 +69,23 @@ def test_store_result_does_not_modify_query_update_at(self):

models.QueryResult.store_result(query.org_id, query.data_source, query.query_hash, query.query_text, "", 0, utcnow())

self.assertEqual(original_updated_at, query.updated_at)
self.assertEqual(original_updated_at, query.updated_at)


class TestDBPersistence(TestCase):
def test_updating_data_removes_cached_result(self):
p = DBPersistence()
p.data = '{"test": 1}'
self.assertDictEqual(p.data, {"test": 1})
p.data = '{"test": 2}'
self.assertDictEqual(p.data, {"test": 2})

@patch('redash.models.json_loads')
def test_calls_json_loads_only_once(self, json_loads_patch):
json_loads_patch.return_value = '1'
p = DBPersistence()
json_data = '{"test": 1}'
p.data = json_data
a = p.data
b = p.data
json_loads_patch.assert_called_once_with(json_data)
6 changes: 4 additions & 2 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from tests import BaseTestCase
from redash import redis_connection, models
from redash.utils import json_dumps
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query

Expand Down Expand Up @@ -57,11 +58,12 @@ def test_success(self):
"""
cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
query_result_data = {"columns": [], "rows": []}
qr.return_value = (json_dumps(query_result_data), None)
result_id = execute_query("SELECT 1, 2", self.factory.data_source.id, {})
self.assertEqual(1, qr.call_count)
result = models.QueryResult.query.get(result_id)
self.assertEqual(result.data, '{1,2}')
self.assertEqual(result.data, query_result_data)

def test_success_scheduled(self):
"""
Expand Down
47 changes: 4 additions & 43 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def test_archive_query_sets_flag(self):
def test_archived_query_doesnt_return_in_all(self):
query = self.factory.create_query(schedule={'interval':'1', 'until':None, 'time': None, 'day_of_week':None})
yesterday = utcnow() - datetime.timedelta(days=1)
query_result, _ = models.QueryResult.store_result(
query_result = models.QueryResult.store_result(
query.org_id, query.data_source, query.query_hash, query.query_text,
"1", 123, yesterday)

Expand Down Expand Up @@ -435,59 +435,20 @@ def setUp(self):
self.query_hash = gen_query_hash(self.query)
self.runtime = 123
self.utcnow = utcnow()
self.data = "data"
self.data = '{"a": 1}'

def test_stores_the_result(self):
query_result, _ = models.QueryResult.store_result(
query_result = models.QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query_result.data, self.data)
self.assertEqual(query_result._data, self.data)
self.assertEqual(query_result.runtime, self.runtime)
self.assertEqual(query_result.retrieved_at, self.utcnow)
self.assertEqual(query_result.query_text, self.query)
self.assertEqual(query_result.query_hash, self.query_hash)
self.assertEqual(query_result.data_source, self.data_source)

def test_updates_existing_queries(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query)

query_result, _ = models.QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertEqual(query3.latest_query_data, query_result)

def test_doesnt_update_queries_with_different_hash(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query + "123")

query_result, _ = models.QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertNotEqual(query3.latest_query_data, query_result)

def test_doesnt_update_queries_with_different_data_source(self):
query1 = self.factory.create_query(query_text=self.query)
query2 = self.factory.create_query(query_text=self.query)
query3 = self.factory.create_query(query_text=self.query, data_source=self.factory.create_data_source())

query_result, _ = models.QueryResult.store_result(
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
self.assertEqual(query2.latest_query_data, query_result)
self.assertNotEqual(query3.latest_query_data, query_result)


class TestEvents(BaseTestCase):
def raw_event(self):
Expand Down