Skip to content

Commit

Permalink
Add global request context to db session (#44)
Browse files Browse the repository at this point in the history
with DBSession() is no longer required for datasources
every flask request will share the same db session
  • Loading branch information
czgu authored May 8, 2020
1 parent ed20c95 commit b2b612c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
17 changes: 15 additions & 2 deletions datahub/server/app/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from werkzeug.exceptions import Forbidden, NotFound

from app.flask_app import flask_app, socketio, limiter
from logic.impression import create_impression
from app.db import get_session
from const.datasources import DS_PATH
from lib.logger import get_logger
from logic.impression import create_impression

LOG = get_logger(__file__)
_host = socket.gethostname()
Expand Down Expand Up @@ -53,9 +54,9 @@ def handler(**kwargs):
else:
params = flask.request.json or {}

status = 200
try:
kwargs.update(params)
status = 200
results = fn(**kwargs)

if not custom_response:
Expand Down Expand Up @@ -83,6 +84,8 @@ def handler(**kwargs):
# sample_rate=1,
# tags={'url': url.replace('/', '.').strip('.')})
finally:
if status != 200 and "database_session" in flask.g:
flask.g.database_session.rollback()
# TODO: implement latency check
pass
# latency_ms = time_utils.now_millis() - start_time
Expand Down Expand Up @@ -189,3 +192,13 @@ def abort_request(
status_code=500, message=None,
):
raise RequestException(message, status_code)


@flask_app.teardown_appcontext
def teardown_database_session(error):
"""Clean up the db connection at the end of request
"""
database_session = flask.g.pop("database_session", None)
if database_session is not None:
get_session().remove()
36 changes: 31 additions & 5 deletions datahub/server/app/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager
import functools

from flask import has_app_context, g, _app_ctx_stack
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
Expand Down Expand Up @@ -39,13 +40,27 @@ def get_db_engine(
return __engine


def get_session():
def get_session(scopefunc=None):
"""Create a global bound scoped_session
Returns:
[type] -- [description]
"""
global __session
if not __session:
__session = scoped_session(sessionmaker(bind=get_db_engine()))
__session = scoped_session(
sessionmaker(bind=get_db_engine()), scopefunc=scopefunc
)
return __session


def get_flask_db_session():
if "database_session" not in g:
g.database_session = get_session(scopefunc=_app_ctx_stack.__ident_func__)()
return g.database_session


def with_session(fn):
"""Decorator for handling sessions."""

Expand All @@ -55,8 +70,12 @@ def func(*args, **kwargs):
# If there's no session, create a new one. We will
# automatically close this after the function is called.
if not kwargs.get("session"):
session = get_session()()
kwargs["session"] = session
# By default we try to use global flask db session first
if has_app_context():
kwargs["session"] = get_flask_db_session()
else: # If not in a flask context then create our own session
session = get_session()()
kwargs["session"] = session

try:
return fn(*args, **kwargs)
Expand All @@ -80,7 +99,14 @@ def func(*args, **kwargs):

@contextmanager
def DBSession():
"""SQLAlchemy database connection"""
# If inside a flask request
# return the flask db session
if has_app_context():
yield get_flask_db_session()
return

# Otherwise create the session as normal
# and teardown at the end
session = get_session()()
try:
yield session
Expand Down
6 changes: 3 additions & 3 deletions datahub/server/app/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from flask import send_from_directory

from app.flask_app import flask_app, limiter
from const.path import WEBAPP_PATH
from app import auth
from app.datasource import register, abort_request

from app import auth
from app.flask_app import flask_app, limiter
from const.path import WEBAPP_PATH

import datasources
import datasources_socketio
Expand Down
11 changes: 4 additions & 7 deletions datahub/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,10 @@ def create_query_execution(query, engine_id, data_cell_id=None, originator=None)

@register("/query_execution/<int:query_execution_id>/", methods=["GET"])
def get_query_execution(query_execution_id):
with DBSession() as session:
execution = logic.get_query_execution_by_id(query_execution_id, session=session)
verify_query_engine_permission(execution.engine_id, session=session)

execution_dict = execution.to_dict(True) if execution is not None else None

return execution_dict
execution = logic.get_query_execution_by_id(query_execution_id)
verify_query_engine_permission(execution.engine_id)
execution_dict = execution.to_dict(True) if execution is not None else None
return execution_dict


@register("/batch/query_execution/", methods=["POST"])
Expand Down

0 comments on commit b2b612c

Please sign in to comment.