Skip to content

Commit

Permalink
Implement Backend.get_session to retrieve scoped session
Browse files Browse the repository at this point in the history
The scoped session is an instance of SqlAlchemy's `Session` class that
is used by the query builder to connect to the database, for both the
SqlAlchemy database backend as well as for Django. Both database
backends need to maintain their own scoped session factory which can be
called to get a session instance.

Certain applications need access to the session. For example,
applications that run AiiDA in a threaded way, such as a REST API server
need to manually close the session after the query has finished because
this is not done automatically when the thread ends. The associated
database connection remains open causing an eventual timeout when a new
request comes in. The method `Backend.get_session` provides an official
API to access the global scoped session instance which can then be
closed.

Additionally, a lot of code that was duplicated across the two
implementations of the `QueryBuilder` for the two database backends has
been moved to the abstract `BackendQueryBuilder`. Normally this code
does indeed belong in the implementations but since the current
implementation for both backends is based on SqlAlchemy they are both
nearly identical. When in the future a new backend is implemented that
does not use SqlAlchemy the current code can be factored out to a
specific `SqlAlchemyQueryBuilder` that can be used for both database
backends.
  • Loading branch information
sphuber committed Jan 31, 2020
1 parent 8cd0b60 commit 09cb6cb
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 467 deletions.
54 changes: 54 additions & 0 deletions aiida/backends/djsite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,57 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# pylint: disable=global-statement
"""Module with implementation of the database backend using Django."""
from aiida.backends.utils import create_sqlalchemy_engine, create_scoped_session_factory

ENGINE = None
SESSION_FACTORY = None


def reset_session():
"""Reset the session which means setting the global engine and session factory instances to `None`."""
global ENGINE
global SESSION_FACTORY

if ENGINE is not None:
ENGINE.dispose()

if SESSION_FACTORY is not None:
SESSION_FACTORY.expunge_all() # pylint: disable=no-member
SESSION_FACTORY.close() # pylint: disable=no-member

ENGINE = None
SESSION_FACTORY = None


def get_scoped_session():
"""Return a scoped session for the given profile that is exclusively to be used for the `QueryBuilder`.
Since the `QueryBuilder` implementation uses SqlAlchemy to map the query onto the models in order to generate the
SQL to be sent to the database, it requires a session, which is an :class:`sqlalchemy.orm.session.Session` instance.
The only purpose is for SqlAlchemy to be able to connect to the database perform the query and retrieve the results.
Even the Django backend implementation will use SqlAlchemy for its `QueryBuilder` and so also needs an SqlA session.
It is important that we do not reuse the scoped session factory in the SqlAlchemy implementation, because that runs
the risk of cross-talk once profiles can be switched dynamically in a single python interpreter. Therefore the
Django implementation of the `QueryBuilder` should keep its own SqlAlchemy engine and scoped session factory
instances that are used to provide the query builder with a session.
:param profile: :class:`aiida.manage.configuration.profile.Profile` for which to configure the engine.
:return: :class:`sqlalchemy.orm.session.Session` instance with engine configured for the given profile.
"""
from aiida.manage.configuration import get_profile

global ENGINE
global SESSION_FACTORY

if SESSION_FACTORY is not None:
session = SESSION_FACTORY()
return session

if ENGINE is None:
ENGINE = create_sqlalchemy_engine(get_profile())

SESSION_FACTORY = create_scoped_session_factory(ENGINE)

return SESSION_FACTORY()
2 changes: 1 addition & 1 deletion aiida/backends/djsite/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def _load_backend_environment(self):

def reset_backend_environment(self):
"""Reset the backend environment."""
from aiida.orm.implementation.django.querybuilder import reset_session
from . import reset_session
reset_session()

def is_database_schema_ahead(self):
Expand Down
13 changes: 8 additions & 5 deletions aiida/backends/djsite/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ def get_creation_statistics(self, user_pk=None):
# pylint: disable=no-member
import sqlalchemy as sa
import aiida.backends.djsite.db.models as djmodels
from aiida.orm.implementation.django.querybuilder import DjangoQueryBuilder
from aiida.manage.manager import get_manager
backend = get_manager().get_backend()

# Get the session (uses internally aldjemy - so, sqlalchemy) also for the Djsite backend
sssn = DjangoQueryBuilder.get_session()
session = backend.get_session()

retdict = {}

total_query = sssn.query(djmodels.DbNode.sa)
types_query = sssn.query(djmodels.DbNode.sa.node_type.label('typestring'), sa.func.count(djmodels.DbNode.sa.id))
stat_query = sssn.query(
total_query = session.query(djmodels.DbNode.sa)
types_query = session.query(
djmodels.DbNode.sa.node_type.label('typestring'), sa.func.count(djmodels.DbNode.sa.id)
)
stat_query = session.query(
sa.func.date_trunc('day', djmodels.DbNode.sa.ctime).label('cday'), sa.func.count(djmodels.DbNode.sa.id)
)

Expand Down
83 changes: 39 additions & 44 deletions aiida/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@
###########################################################################
# pylint: disable=import-error,no-name-in-module,global-statement
"""Module with implementation of the database backend using SqlAlchemy."""
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from aiida.backends.utils import create_sqlalchemy_engine, create_scoped_session_factory

# The next two serve as global variables, set in the `load_dbenv` call and should be properly reset upon forking.
ENGINE = None
SCOPED_SESSION_CLASS = None
SESSION_FACTORY = None


def reset_session():
"""Reset the session which means setting the global engine and session factory instances to `None`."""
global ENGINE
global SESSION_FACTORY

if ENGINE is not None:
ENGINE.dispose()

if SESSION_FACTORY is not None:
SESSION_FACTORY.expunge_all() # pylint: disable=no-member
SESSION_FACTORY.close() # pylint: disable=no-member

ENGINE = None
SESSION_FACTORY = None


def get_scoped_session():
Expand All @@ -24,55 +37,37 @@ def get_scoped_session():
According to SQLAlchemy docs, this returns always the same object within a thread, and a different object in a
different thread. Moreover, since we update the session class upon forking, different session objects will be used.
"""
global SCOPED_SESSION_CLASS

if SCOPED_SESSION_CLASS is None:
reset_session()
from multiprocessing.util import register_after_fork
from aiida.manage.configuration import get_profile

return SCOPED_SESSION_CLASS()
global ENGINE
global SESSION_FACTORY

if SESSION_FACTORY is not None:
session = SESSION_FACTORY()
return session

def recreate_after_fork(engine): # pylint: disable=unused-argument
"""Callback called after a fork.
if ENGINE is None:
ENGINE = create_sqlalchemy_engine(get_profile())

Not only disposes the engine, but also recreates a new scoped session to use independent sessions in the fork.
SESSION_FACTORY = create_scoped_session_factory(ENGINE, expire_on_commit=True)
register_after_fork(ENGINE, recreate_after_fork)

:param engine: the engine that will be used by the sessionmaker
"""
global ENGINE
global SCOPED_SESSION_CLASS
return SESSION_FACTORY()

ENGINE.dispose()
SCOPED_SESSION_CLASS = scoped_session(sessionmaker(bind=ENGINE, expire_on_commit=True))

def recreate_after_fork(engine):
"""Callback called after a fork.
def reset_session(profile=None):
"""
Resets (global) engine and sessionmaker classes, to create a new one
(or creates a new one from scratch if not already available)
Not only disposes the engine, but also recreates a new scoped session to use independent sessions in the fork.
:param profile: the profile whose configuration to use to connect to the database
:param engine: the engine that will be used by the sessionmaker
"""
from multiprocessing.util import register_after_fork
from aiida.common import json
from aiida.manage.configuration import get_profile

global ENGINE
global SCOPED_SESSION_CLASS

if profile is None:
profile = get_profile()

separator = ':' if profile.database_port else ''
engine_url = 'postgresql://{user}:{password}@{hostname}{separator}{port}/{name}'.format(
separator=separator,
user=profile.database_username,
password=profile.database_password,
hostname=profile.database_hostname,
port=profile.database_port,
name=profile.database_name
)

ENGINE = create_engine(engine_url, json_serializer=json.dumps, json_deserializer=json.loads, encoding='utf-8')
SCOPED_SESSION_CLASS = scoped_session(sessionmaker(bind=ENGINE, expire_on_commit=True))
register_after_fork(ENGINE, recreate_after_fork)
global SESSION_FACTORY

engine.dispose()
ENGINE = create_sqlalchemy_engine(get_profile())
SESSION_FACTORY = create_scoped_session_factory(ENGINE, expire_on_commit=True)
9 changes: 3 additions & 6 deletions aiida/backends/sqlalchemy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,12 @@ def get_settings_manager(self):

def _load_backend_environment(self):
"""Load the backend environment."""
from . import reset_session
reset_session()
get_scoped_session()

def reset_backend_environment(self):
"""Reset the backend environment."""
from aiida.backends import sqlalchemy
if sqlalchemy.ENGINE is not None:
sqlalchemy.ENGINE.dispose()
sqlalchemy.SCOPED_SESSION_CLASS = None
from . import reset_session
reset_session()

def is_database_schema_ahead(self):
"""Determine whether the database schema version is ahead of the code schema version.
Expand Down
21 changes: 21 additions & 0 deletions aiida/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,27 @@
AIIDA_ATTRIBUTE_SEP = '.'


def create_sqlalchemy_engine(profile):
from sqlalchemy import create_engine
from aiida.common import json

separator = ':' if profile.database_port else ''
engine_url = 'postgresql://{user}:{password}@{hostname}{separator}{port}/{name}'.format(
separator=separator,
user=profile.database_username,
password=profile.database_password,
hostname=profile.database_hostname,
port=profile.database_port,
name=profile.database_name
)
return create_engine(engine_url, json_serializer=json.dumps, json_deserializer=json.loads, encoding='utf-8')


def create_scoped_session_factory(engine, **kwargs):
from sqlalchemy.orm import scoped_session, sessionmaker
return scoped_session(sessionmaker(bind=engine, **kwargs))


def delete_nodes_and_connections(pks):
if configuration.PROFILE.database_backend == BACKEND_DJANGO:
from aiida.backends.djsite.utils import delete_nodes_and_connections_django as delete_nodes_backend
Expand Down
7 changes: 7 additions & 0 deletions aiida/orm/implementation/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ def transaction(self):
:return: a context manager to group database operations
"""

@abc.abstractmethod
def get_session(self):
"""Return a database session that can be used by the `QueryBuilder` to perform its query.
:return: an instance of :class:`sqlalchemy.orm.session.Session`
"""


class BackendEntity(abc.ABC):
"""An first-class entity in the backend"""
Expand Down
13 changes: 12 additions & 1 deletion aiida/orm/implementation/django/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Django implementation of `aiida.orm.implementation.backends.Backend`."""

from contextlib import contextmanager

# pylint: disable=import-error,no-name-in-module
Expand Down Expand Up @@ -89,6 +88,18 @@ def transaction():
"""Open a transaction to be used as a context manager."""
return transaction.atomic()

@staticmethod
def get_session():
"""Return a database session that can be used by the `QueryBuilder` to perform its query.
If there is an exception within the context then the changes will be rolled back and the state will
be as before entering. Transactions can be nested.
:return: an instance of :class:`sqlalchemy.orm.session.Session`
"""
from aiida.backends.djsite import get_scoped_session
return get_scoped_session()

# Below are abstract methods inherited from `aiida.orm.implementation.sql.backends.SqlBackend`

def get_backend_entity(self, model):
Expand Down
Loading

0 comments on commit 09cb6cb

Please sign in to comment.