Skip to content

Commit

Permalink
Reset the database lock if pid changed
Browse files Browse the repository at this point in the history
  • Loading branch information
arikfr committed May 18, 2014
1 parent 4af979d commit 8e1c852
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
21 changes: 13 additions & 8 deletions redash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ def setup_logging():

events.setup_logging(settings.EVENTS_LOG_PATH, settings.EVENTS_CONSOLE_OUTPUT)

setup_logging()

redis_url = urlparse.urlparse(settings.REDIS_URL)
if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0
def create_redis_connection():
redis_url = urlparse.urlparse(settings.REDIS_URL)
if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0

r = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_url.password)

return r

# TODO: move this to function that create a connection?
redis_connection = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_url.password)

setup_logging()
redis_connection = create_redis_connection()
statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX)
12 changes: 12 additions & 0 deletions redash/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import hashlib
import logging
import os
import threading
import time
import datetime
import itertools
Expand All @@ -19,18 +21,28 @@ def __init__(self):
self.database_name = self.database_config.pop('name')
self.database = peewee.PostgresqlDatabase(self.database_name, **self.database_config)
self.app = None
self.pid = os.getpid()

def init_app(self, app):
self.app = app
self.register_handlers()

def connect_db(self):
self._check_pid()
self.database.connect()

def close_db(self, exc):
self._check_pid()
if not self.database.is_closed():
self.database.close()

def _check_pid(self):
current_pid = os.getpid()
if self.pid != current_pid:
logging.info("New pid detected (%d!=%d); resetting database lock.", self.pid, current_pid)
self.pid = os.getpid()
self.database._conn_lock = threading.Lock()

def register_handlers(self):
self.app.before_request(self.connect_db)
self.app.teardown_request(self.close_db)
Expand Down
26 changes: 19 additions & 7 deletions redash/tasks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import time
import datetime
from celery.utils.log import get_task_logger
import logging
from celery.result import AsyncResult
import redis
from redash.data.query_runner import get_query_runner
from redash import models, redis_connection, statsd_client
from redash.worker import celery
from celery import Task
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.data.query_runner import get_query_runner

logger = get_task_logger(__name__)


class BaseTask(Task):
abstract = True

def after_return(self, *args, **kwargs):
models.db.close_db(None)

def __call__(self, *args, **kwargs):
models.db.connect_db()
return super(BaseTask, self).__call__(*args, **kwargs)


class QueryTask(object):
MAX_RETRIES = 5

Expand Down Expand Up @@ -105,7 +117,7 @@ def cancel(self):
return self._async_result.revoke(terminate=True)


@celery.task
@celery.task(base=BaseTask)
def refresh_queries():
# self.status['last_refresh_at'] = time.time()
# self._save_status()
Expand Down Expand Up @@ -134,7 +146,7 @@ def refresh_queries():

statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))

@celery.task(bind=True, track_started=True)
@celery.task(bind=True, base=BaseTask, track_started=True)
def execute_query(self, query, data_source_id):
# TODO: maybe this should be a class?
start_time = time.time()
Expand Down

0 comments on commit 8e1c852

Please sign in to comment.