diff --git a/client/app/pages/settings/OrganizationSettings.jsx b/client/app/pages/settings/OrganizationSettings.jsx
index 77f3d11f92..75bdfbbf0e 100644
--- a/client/app/pages/settings/OrganizationSettings.jsx
+++ b/client/app/pages/settings/OrganizationSettings.jsx
@@ -164,6 +164,14 @@ class OrganizationSettings extends React.Component {
Enable multi-byte (Chinese, Japanese, and Korean) search for query names and descriptions (slower)
+
+ this.handleChange('send_email_on_failed_scheduled_queries', e.target.checked)}
+ >Email query owners when scheduled queries fail
+
+
settings.MAX_FAILURE_REPORTS_PER_QUERY * 0.75:
+ return """NOTICE: This query has failed a total of {schedule_failures} times.
+ Reporting may stop when the query exceeds {max_failure_reports} overall failures.""".format(
+ schedule_failures=schedule_failures,
+ max_failure_reports=settings.MAX_FAILURE_REPORTS_PER_QUERY
+ )
+
+
+@celery.task(name="redash.tasks.send_aggregated_errors")
+def send_aggregated_errors():
+ for key in redis_connection.scan_iter(key("*")):
+ user_id = re.search(r'\d+', key).group()
+ send_failure_report(user_id)
+
+
+def send_failure_report(user_id):
+ user = models.User.get_by_id(user_id)
+ errors = [json_loads(e) for e in redis_connection.lrange(key(user_id), 0, -1)]
+
+ if errors:
+ errors.reverse()
+ occurrences = Counter((e.get('id'), e.get('message')) for e in errors)
+ unique_errors = {(e.get('id'), e.get('message')): e for e in errors}
+
+ context = {
+ 'failures': [{
+ 'id': v.get('id'),
+ 'name': v.get('name'),
+ 'failed_at': v.get('failed_at'),
+ 'failure_reason': v.get('message'),
+ 'failure_count': occurrences[k],
+ 'comment': comment_for(v)
+ } for k, v in unique_errors.iteritems()],
+ 'base_url': base_url(user.org)
+ }
+
+ html = render_template('emails/failures.html', **context)
+ text = render_template('emails/failures.txt', **context)
+ subject = "Redash failed to execute {} of your scheduled queries".format(len(unique_errors.keys()))
+ send_mail.delay([user.email], subject, html, text)
+
+ redis_connection.delete(key(user_id))
+
+
+def notify_of_failure(message, query):
+ subscribed = query.org.get_setting('send_email_on_failed_scheduled_queries')
+ exceeded_threshold = query.schedule_failures >= settings.MAX_FAILURE_REPORTS_PER_QUERY
+
+ if subscribed and not query.user.is_disabled and not exceeded_threshold:
+ redis_connection.lpush(key(query.user.id), json_dumps({
+ 'id': query.id,
+ 'name': query.name,
+ 'message': message,
+ 'schedule_failures': query.schedule_failures,
+ 'failed_at': datetime.datetime.utcnow().strftime("%B %d, %Y %I:%M%p UTC")
+ }))
diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py
index b54c831b55..e923639bce 100644
--- a/redash/tasks/queries.py
+++ b/redash/tasks/queries.py
@@ -1,7 +1,6 @@
import logging
import signal
import time
-
import redis
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from celery.result import AsyncResult
@@ -11,10 +10,12 @@
from redash import models, redis_connection, settings, statsd_client
from redash.query_runner import InterruptException
from redash.tasks.alerts import check_alerts_for_query
+from redash.tasks.failure_report import notify_of_failure
from redash.utils import gen_query_hash, json_dumps, utcnow, mustache_render
from redash.worker import celery
logger = get_task_logger(__name__)
+TIMEOUT_MESSAGE = "Query exceeded Redash query execution time limit."
def _job_lock_id(query_hash, data_source_id):
@@ -56,7 +57,7 @@ def to_dict(self):
status = self.STATUSES[task_status]
if isinstance(result, (TimeLimitExceeded, SoftTimeLimitExceeded)):
- error = "Query exceeded Redash query execution time limit."
+ error = TIMEOUT_MESSAGE
status = 4
elif isinstance(result, Exception):
error = result.message
@@ -142,7 +143,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
result = execute_query.apply_async(args=args,
argsrepr=argsrepr,
queue=queue_name,
- time_limit=time_limit)
+ soft_time_limit=time_limit)
job = QueryTask(async_result=result)
logging.info("[%s] Created new job: %s", query_hash, job.id)
@@ -338,7 +339,11 @@ def run(self):
try:
data, error = query_runner.run_query(annotated_query, self.user)
except Exception as e:
- error = text_type(e)
+ if isinstance(e, SoftTimeLimitExceeded):
+ error = TIMEOUT_MESSAGE
+ else:
+ error = text_type(e)
+
data = None
logging.warning('Unexpected error while running query:', exc_info=1)
@@ -354,6 +359,7 @@ def run(self):
self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False)
self.scheduled_query.schedule_failures += 1
models.db.session.add(self.scheduled_query)
+ notify_of_failure(error, self.scheduled_query)
models.db.session.commit()
raise result
else:
@@ -411,5 +417,6 @@ def execute_query(self, query, data_source_id, metadata, user_id=None,
scheduled_query = models.Query.query.get(scheduled_query_id)
else:
scheduled_query = None
+
return QueryExecutor(self, query, data_source_id, user_id, is_api_key, metadata,
scheduled_query).run()
diff --git a/redash/templates/emails/failures.html b/redash/templates/emails/failures.html
new file mode 100644
index 0000000000..086bd8bd26
--- /dev/null
+++ b/redash/templates/emails/failures.html
@@ -0,0 +1,41 @@
+
+
+
+
+
+
+
+
+
Redash failed to run the following queries:
+
+ {% for failure in failures %}
+
+
+
+
+ Last failed: {{failure.failed_at}}
+ {% if failure.failure_count > 1 %}
+
+ + {{failure.failure_count - 1}} more failures since last report
+ {% endif %}
+
+
Exception
+
{{failure.failure_reason}}
+
+
+ {% if failure.comment %}
+
{{failure.comment}}
+ {% endif %}
+
+ {% endfor %}
+
+
+
+
+
diff --git a/redash/templates/emails/failures.txt b/redash/templates/emails/failures.txt
new file mode 100644
index 0000000000..9837f345c6
--- /dev/null
+++ b/redash/templates/emails/failures.txt
@@ -0,0 +1,15 @@
+Redash failed to run the following queries:
+
+{% for failure in failures %}
+
+{{failure.name}} ({{base_url}}/queries/{{failure.id}})
+Last failed: {{failure.failed_at}}{% if failure.failure_count > 1 %} + {{failure.failure_count - 1}} more failures since last report{% endif %}
+Exception:
+
+{{failure.failure_reason}}
+
+{% if failure.comment %}
+{{failure.comment}}
+{% endif %}
+
+{% endfor %}
\ No newline at end of file
diff --git a/redash/worker.py b/redash/worker.py
index 4caca5bbf3..292610d2ef 100644
--- a/redash/worker.py
+++ b/redash/worker.py
@@ -38,6 +38,10 @@
'sync_user_details': {
'task': 'redash.tasks.sync_user_details',
'schedule': timedelta(minutes=1),
+ },
+ 'send_aggregated_errors': {
+ 'task': 'redash.tasks.send_aggregated_errors',
+ 'schedule': timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
}
}
diff --git a/tests/tasks/test_failure_report.py b/tests/tasks/test_failure_report.py
new file mode 100644
index 0000000000..855b54ca1c
--- /dev/null
+++ b/tests/tasks/test_failure_report.py
@@ -0,0 +1,99 @@
+from unittest import TestCase
+
+import mock
+from freezegun import freeze_time
+import dateutil
+
+from tests import BaseTestCase
+from redash import redis_connection, models, settings
+from redash.tasks.failure_report import notify_of_failure, send_failure_report, send_aggregated_errors, key
+from redash.utils import json_loads
+
+class TestSendAggregatedErrorsTask(BaseTestCase):
+ def setUp(self):
+ super(TestSendAggregatedErrorsTask, self).setUp()
+ redis_connection.flushall()
+ self.factory.org.set_setting('send_email_on_failed_scheduled_queries', True)
+
+ def notify(self, message="Oh no, I failed!", query=None, **kwargs):
+ if query is None:
+ query = self.factory.create_query(**kwargs)
+
+ notify_of_failure(message, query)
+ return key(query.user.id)
+
+ @mock.patch('redash.tasks.failure_report.render_template')
+ def send_email(self, user, render_template):
+ send_failure_report(user.id)
+
+ _, context = render_template.call_args
+ return context['failures']
+
+ def test_schedules_email_if_failure_count_is_beneath_limit(self):
+ key = self.notify(schedule_failures=settings.MAX_FAILURE_REPORTS_PER_QUERY - 1)
+ email_pending = redis_connection.exists(key)
+ self.assertTrue(email_pending)
+
+ def test_does_not_report_if_failure_count_is_beyond_limit(self):
+ key = self.notify(schedule_failures=settings.MAX_FAILURE_REPORTS_PER_QUERY)
+ email_pending = redis_connection.exists(key)
+ self.assertFalse(email_pending)
+
+ def test_does_not_report_if_organization_is_not_subscribed(self):
+ self.factory.org.set_setting('send_email_on_failed_scheduled_queries', False)
+ key = self.notify()
+ email_pending = redis_connection.exists(key)
+ self.assertFalse(email_pending)
+
+ def test_does_not_report_if_query_owner_is_disabled(self):
+ self.factory.user.disable()
+ key = self.notify()
+ email_pending = redis_connection.exists(key)
+ self.assertFalse(email_pending)
+
+ def test_does_not_indicate_when_not_near_limit_for_a_query(self):
+ self.notify(schedule_failures=settings.MAX_FAILURE_REPORTS_PER_QUERY / 2)
+ failures = self.send_email(self.factory.user)
+
+ self.assertFalse(failures[0]['comment'])
+
+ def test_indicates_when_near_limit_for_a_query(self):
+ self.notify(schedule_failures=settings.MAX_FAILURE_REPORTS_PER_QUERY - 1)
+ failures = self.send_email(self.factory.user)
+
+ self.assertTrue(failures[0]['comment'])
+
+ def test_aggregates_different_queries_in_a_single_report(self):
+ key1 = self.notify(message="I'm a failure")
+ key2 = self.notify(message="I'm simply not a success")
+
+ self.assertEqual(key1, key2)
+
+ def test_counts_failures_for_each_reason(self):
+ query = self.factory.create_query()
+
+ self.notify(message="I'm a failure", query=query)
+ self.notify(message="I'm a failure", query=query)
+ self.notify(message="I'm a different type of failure", query=query)
+ self.notify(message="I'm a totally different query")
+
+ failures = self.send_email(query.user)
+
+ f1 = next(f for f in failures if f["failure_reason"] == "I'm a failure")
+ self.assertEqual(2, f1['failure_count'])
+ f2 = next(f for f in failures if f["failure_reason"] == "I'm a different type of failure")
+ self.assertEqual(1, f2['failure_count'])
+ f3 = next(f for f in failures if f["failure_reason"] == "I'm a totally different query")
+ self.assertEqual(1, f3['failure_count'])
+
+ def test_shows_latest_failure_time(self):
+ query = self.factory.create_query()
+
+ with freeze_time("2000-01-01"):
+ self.notify(query=query)
+
+ self.notify(query=query)
+
+ failures = self.send_email(query.user)
+ latest_failure = dateutil.parser.parse(failures[0]['failed_at'])
+ self.assertNotEqual(2000, latest_failure.year)
diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py
index 758d6e5402..d542c3991e 100644
--- a/tests/tasks/test_queries.py
+++ b/tests/tasks/test_queries.py
@@ -36,7 +36,7 @@ def test_limits_query_time(self, _):
enqueue_query(query.query_text, query.data_source, query.user_id, False, query, {'Username': 'Arik', 'Query ID': query.id})
_, kwargs = execute_query.apply_async.call_args
- self.assertEqual(60, kwargs.get('time_limit'))
+ self.assertEqual(60, kwargs.get('soft_time_limit'))
def test_multiple_enqueue_of_different_query(self):
query = self.factory.create_query()