Skip to content

Commit

Permalink
refresh_queries shouldn't break because of a single query having a ba…
Browse files Browse the repository at this point in the history
…d schedule object (#4163)

* move filtering of invalid schedules to the query

* simplify retrieved_at assignment and wrap in a try/except block to avoid one query blowing up the rest

* refactor refresh_queries to use simpler functions with a single responsibility and add try/except blocks to avoid one query blowing up the rest

* avoid blowing up when job locks point to expired Job objects. Enqueue them again instead

* there's no need to check for the existence of interval - all schedules have intervals

* disable faulty schedules

* reduce FP style in refresh_queries

* report refresh_queries errors to Sentry (if it is configured)

* avoid using exists+fetch and use exceptions instead
  • Loading branch information
Omer Lachish authored Mar 1, 2020
1 parent b0f1cdd commit a9cb87d
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 273 deletions.
56 changes: 31 additions & 25 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
json_loads,
mustache_render,
base_url,
sentry,
)
from redash.utils.configuration import ConfigurationContainer
from redash.models.parameterized_query import ParameterizedQuery
Expand Down Expand Up @@ -630,34 +631,39 @@ def outdated_queries(cls):
scheduled_queries_executions.refresh()

for query in queries:
if query.schedule["interval"] is None:
continue
try:
if query.schedule.get("disabled"):
continue

if query.schedule["until"] is not None:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)
if query.schedule["until"]:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)

if schedule_until <= now:
continue
if schedule_until <= now:
continue

retrieved_at = scheduled_queries_executions.get(query.id) or (
query.latest_query_data and query.latest_query_data.retrieved_at
)

if query.latest_query_data:
retrieved_at = query.latest_query_data.retrieved_at
else:
retrieved_at = now

retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at

if should_schedule_next(
retrieved_at,
now,
query.schedule["interval"],
query.schedule["time"],
query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
if should_schedule_next(
retrieved_at or now,
now,
query.schedule["interval"],
query.schedule["time"],
query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
except Exception as e:
query.schedule["disabled"] = True
db.session.commit()

message = "Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled." % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)

return list(outdated_queries.values())

Expand Down
27 changes: 17 additions & 10 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from rq import get_current_job
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException
from rq.exceptions import NoSuchJobError

from redash import models, redis_connection, settings
from redash.query_runner import InterruptException
Expand Down Expand Up @@ -43,16 +44,22 @@ def enqueue_query(
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id)

job = Job.fetch(job_id)

status = job.get_status()
if status in [JobStatus.FINISHED, JobStatus.FAILED]:
logger.info(
"[%s] job found is ready (%s), removing lock",
query_hash,
status,
)
job_complete = None

try:
job = Job.fetch(job_id)
job_exists = True
status = job.get_status()
job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED]

if job_complete:
message = "job found is complete (%s)" % status
except NoSuchJobError:
message = "job found has expired"
job_exists = False

if job_complete or not job_exists:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None

Expand Down
150 changes: 72 additions & 78 deletions redash/tasks/queries/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
QueryDetachedFromDataSourceError,
)
from redash.tasks.failure_report import track_failure
from redash.utils import json_dumps
from redash.utils import json_dumps, sentry
from redash.worker import job, get_job_logger

from .execution import enqueue_query
Expand All @@ -27,85 +27,79 @@ def empty_schedules():
logger.info("Deleted %d schedules.", len(queries))


def refresh_queries():
logger.info("Refreshing queries...")

outdated_queries_count = 0
query_ids = []

with statsd_client.timer("manager.outdated_queries_lookup"):
for query in models.Query.outdated_queries():
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
elif query.org.is_disabled:
logger.debug(
"Skipping refresh of %s because org is disabled.", query.id
)
elif query.data_source is None:
logger.debug(
"Skipping refresh of %s because the datasource is none.", query.id
)
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
else:
query_text = query.query_text

parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
continue

enqueue_query(
query_text,
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)

query_ids.append(query.id)
outdated_queries_count += 1

statsd_client.gauge("manager.outdated_queries", outdated_queries_count)

logger.info(
"Done refreshing queries. Found %d outdated queries: %s"
% (outdated_queries_count, query_ids)
)

status = redis_connection.hgetall("redash:status")
now = time.time()
def _should_refresh_query(query):
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
return False
elif query.org.is_disabled:
logger.debug("Skipping refresh of %s because org is disabled.", query.id)
return False
elif query.data_source is None:
logger.debug("Skipping refresh of %s because the datasource is none.", query.id)
return False
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
return False
else:
return True


def _apply_default_parameters(query):
parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
return query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
raise
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
raise
else:
return query.query_text

redis_connection.hmset(
"redash:status",
{
"outdated_queries_count": outdated_queries_count,
"last_refresh_at": now,
"query_ids": json_dumps(query_ids),
},
)

statsd_client.gauge(
"manager.seconds_since_refresh", now - float(status.get("last_refresh_at", now))
)
def refresh_queries():
logger.info("Refreshing queries...")
enqueued = []
for query in models.Query.outdated_queries():
if not _should_refresh_query(query):
continue

try:
enqueue_query(
_apply_default_parameters(query),
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)
enqueued.append(query)
except Exception as e:
message = "Could not enqueue query %d due to %s" % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)

status = {
"outdated_queries_count": len(enqueued),
"last_refresh_at": time.time(),
"query_ids": json_dumps([q.id for q in enqueued]),
}

redis_connection.hmset("redash:status", status)
logger.info("Done refreshing queries: %s" % status)


def cleanup_query_results():
Expand Down
4 changes: 4 additions & 0 deletions redash/utils/sentry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sentry_sdk
from funcy import iffy
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration
Expand Down Expand Up @@ -33,3 +34,6 @@ def init():
RqIntegration(),
],
)


capture_message = iffy(lambda _: settings.SENTRY_DSN, sentry_sdk.capture_message)
28 changes: 28 additions & 0 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from mock import patch, Mock

from rq import Connection
from rq.exceptions import NoSuchJobError

from tests import BaseTestCase
from redash import redis_connection, rq_redis_connection, models
Expand Down Expand Up @@ -67,6 +68,33 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _):

self.assertEqual(1, enqueue.call_count)

def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):
query = self.factory.create_query()

with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)

# "expire" the previous job
fetch_job.side_effect = NoSuchJobError

enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)

self.assertEqual(2, enqueue.call_count)

@patch("redash.settings.dynamic_settings.query_time_limit", return_value=60)
def test_limits_query_time(self, _, enqueue, __):
query = self.factory.create_query()
Expand Down
Loading

0 comments on commit a9cb87d

Please sign in to comment.