Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ: periodically clear failed jobs #4306

Merged
merged 11 commits into from
Nov 7, 2019
7 changes: 6 additions & 1 deletion redash/schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import absolute_import
import logging
from datetime import datetime, timedelta
from functools import partial
from random import randint
Expand All @@ -8,9 +9,11 @@
from redash import settings, rq_redis_connection
from redash.tasks import (sync_user_details, refresh_queries,
empty_schedules, refresh_schemas,
cleanup_query_results,
cleanup_query_results, purge_failed_jobs,
version_check, send_aggregated_errors)

logger = logging.getLogger(__name__)

rq_scheduler = Scheduler(connection=rq_redis_connection,
queue_name="periodic",
interval=5)
Expand All @@ -36,6 +39,7 @@ def schedule_periodic_jobs():
{"func": empty_schedules, "interval": timedelta(minutes=60)},
{"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)},
{"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)},
{"func": purge_failed_jobs, "interval": timedelta(days=1)},
{"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)}
]

Expand All @@ -53,4 +57,5 @@ def schedule_periodic_jobs():
jobs.extend(settings.dynamic_settings.periodic_jobs() or [])

for job in jobs:
logger.info("Scheduling %s with interval %s.", job['func'].__name__, job.get('interval'))
schedule(**job)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
STATIC_ASSETS_PATH = fix_assets_path(os.environ.get("REDASH_STATIC_ASSETS_PATH", "../client/dist/"))

JOB_EXPIRY_TIME = int(os.environ.get("REDASH_JOB_EXPIRY_TIME", 3600 * 12))
JOB_DEFAULT_FAILURE_TTL = int(os.environ.get("REDASH_JOB_DEFAULT_FAILURE_TTL", 7 * 24 * 60 * 60))

LOG_LEVEL = os.environ.get("REDASH_LOG_LEVEL", "INFO")
LOG_STDOUT = parse_boolean(os.environ.get('REDASH_LOG_STDOUT', 'false'))
Expand Down
2 changes: 1 addition & 1 deletion redash/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .general import record_event, version_check, send_mail, sync_user_details
from .general import record_event, version_check, send_mail, sync_user_details, purge_failed_jobs
from .queries import (QueryTask, enqueue_query, execute_query, refresh_queries,
refresh_schemas, cleanup_query_results, empty_schedules)
from .alerts import check_alerts_for_query
Expand Down
20 changes: 19 additions & 1 deletion redash/tasks/general.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import requests
from datetime import datetime

from flask_mail import Message
from redash import mail, models, settings
from rq import Connection, Queue
from rq.registry import FailedJobRegistry
from rq.job import Job
from redash import mail, models, settings, rq_redis_connection
from redash.models import users
from redash.version_check import run_version_check
from redash.worker import job, get_job_logger
Expand Down Expand Up @@ -60,3 +64,17 @@ def send_mail(to, subject, html, text):

def sync_user_details():
users.sync_last_active_at()


def purge_failed_jobs():
with Connection(rq_redis_connection):
for queue in Queue.all():
failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
stale_jobs = [job for job in failed_jobs if (datetime.utcnow() - job.ended_at).seconds > settings.JOB_DEFAULT_FAILURE_TTL]

for job in stale_jobs:
job.delete()

if stale_jobs:
logger.info('Purged %d old failed jobs from the %s queue.', len(stale_jobs), queue.name)