Skip to content

Commit

Permalink
feat: extend logging of db.py and dbview.py
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkh committed Jul 24, 2023
1 parent 494a27d commit c809afc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 10 deletions.
51 changes: 41 additions & 10 deletions tdp_core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from flask import abort
from sqlalchemy.exc import OperationalError
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm import Session
from visyn_core import manager
from werkzeug.datastructures import MultiDict
Expand Down Expand Up @@ -107,9 +107,17 @@ def __init__(self, engine):
session wrapper of sql alchemy with auto cleanup
:param engine:
"""
_log.info("creating session")
self._engine = engine
import uuid
self._name = uuid.uuid4()
_log.debug("%s - engine status before: %s", self._name, engine.pool.status())
_log.debug("%s - creating session", self._name)
# add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW
# https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g
self._session: Session = manager.db.create_session(engine)
_log.debug("%s - session created", self._name)
self._supports_array_parameter = _supports_sql_parameters(engine.name)
_log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter)

def execute(self, sql, **kwargs):
"""
Expand All @@ -118,12 +126,18 @@ def execute(self, sql, **kwargs):
:param kwargs: additional args to replace
:return: the session result
"""
_log.debug("%s - replace array parameter in sql query: %s", self._name, sql)
parsed = to_query(sql, self._supports_array_parameter, kwargs)
_log.info("%s (%s)", parsed, kwargs)
_log.debug("%s - execute the given query with the given args: %s", self._name, sql)
_log.debug("%s (%s)", parsed, kwargs)
try:
return self._session.execute(parsed, kwargs)
except OperationalError as error:
_log.error('OperationalError: %s', error)
abort(408, error)
except SQLAlchemyError as error:
_log.error('SQLAlchemyError: %s', error)


def run(self, sql, **kwargs):
"""
Expand All @@ -132,7 +146,9 @@ def run(self, sql, **kwargs):
:param kwargs: args for this query
:return: list of dicts
"""
_log.debug("%s - run sql statement: %s", self._name, sql)
result = self.execute(sql, **kwargs)
_log.debug("%s - ran sql statement: %s", self._name, sql)
columns = result.keys()
return [{c: r[c] for c in columns} for r in result]

Expand All @@ -153,9 +169,11 @@ def rollback(self):

def _destroy(self):
if self._session:
_log.info("removing session again")
_log.debug("%s - removing session", self._name)
self._session.close()
self._session = None # type: ignore
_log.debug("%s - removed session", self._name)
_log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status())

def __del__(self):
self._destroy()
Expand Down Expand Up @@ -342,14 +360,18 @@ def get_data(
query = view.query

if callable(query):
_log.debug("GET DATA with callback variant")
# callback variant
return query(engine, arguments, filters), view

with session(engine) as sess:
_log.debug("%s - GET DATA with session", sess._name)
if config.statement_timeout and config.statement_timeout_query:
_log.info("set statement_timeout to {}".format(config.statement_timeout))
_log.debug("set statement_timeout to {}".format(config.statement_timeout))
sess.execute(config.statement_timeout_query.format(config.statement_timeout))
_log.debug("%s - GET DATA before run", sess._name)
r = sess.run(query.format(**replace), **kwargs)
_log.debug("%s - GET DATA after run", sess._name)
return r, view


Expand Down Expand Up @@ -434,10 +456,13 @@ def get_count(database, view_name, args):
return count_query(engine, processed_args, where_clause)

with session(engine) as sess:
_log.debug("%s - GET COUNT with session", sess._name)
if config.statement_timeout and config.statement_timeout_query:
_log.info("set statement_timeout to {}".format(config.statement_timeout))
_log.debug("set statement_timeout to {}".format(config.statement_timeout))
sess.execute(config.statement_timeout_query.format(config.statement_timeout))
_log.debug("%s - GET COUNT before run", sess._name)
r = sess.run(count_query.format(**replace), **kwargs)
_log.debug("%s - GET COUNT after run", sess._name)
if r:
return r[0]["count"]
return 0
Expand Down Expand Up @@ -483,11 +508,14 @@ def derive_columns(table_name, engine, columns=None):
k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col
]
if number_columns or categorical_columns:
with session(engine) as s:
with session(engine) as sess:
_log.debug("%s - DERIVE COLUMNS with session", sess._name)
if number_columns:
template = "min({col}) as {col}_min, max({col}) as {col}_max"
minmax = ", ".join(template.format(col=col) for col in number_columns)
row = next(iter(s.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax))))
_log.debug("%s - DERIVE COLUMNS number columns before run", sess._name)
row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax))))
_log.debug("%s - DERIVE COLUMNS number columns after run", sess._name)
for num_col in number_columns:
columns[num_col]["min"] = row[num_col + "_min"]
columns[num_col]["max"] = row[num_col + "_max"]
Expand All @@ -496,7 +524,9 @@ def derive_columns(table_name, engine, columns=None):
if _differentiates_empty_string_and_null(engine.name):
template += """ AND {col} <> ''"""
template += """ ORDER BY {col} ASC"""
cats = s.execute(template.format(col=col, table=table_name))
_log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col)
cats = sess.execute(template.format(col=col, table=table_name))
_log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col)
categories = [str(r["cat"]) for r in cats if r["cat"] is not None]
if columns[col]["type"] == "set":
separator = getattr(columns[col], "separator", ";")
Expand All @@ -505,12 +535,13 @@ def derive_columns(table_name, engine, columns=None):
categories = list({category for sublist in separated_categories for category in sublist})
categories.sort() # sort list to avoid random order with each run
columns[col]["categories"] = categories
_log.debug("%s - DERIVE COLUMNS done", sess._name)

return columns


def _fill_up_columns(view, engine):
_log.info("fill up view")
_log.debug("fill up view %s", view)
# update the real object
view.columns = derive_columns(view.table, engine, view.columns)
view.columns_filled_up = True
Expand Down
3 changes: 3 additions & 0 deletions tdp_core/dbview.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ def __init__(self, views=None, agg_score=None, mappings=None):
:param agg_score: optional specify how aggregation should be handled
:param mappings: optional database mappings
"""
_log.debug("create db connector")
self.agg_score = agg_score or default_agg_score
self.views = views or {}
self.dburl: str = None # type: ignore
Expand All @@ -625,7 +626,9 @@ def create_engine(self, config) -> Engine:
"pool_pre_ping": True,
}
engine_options.update(config.get("engine", {}))
_log.debug("db connector: create engine with options %s", engine_options)
return sqlalchemy.create_engine(self.dburl, **engine_options)

def create_sessionmaker(self, engine) -> sessionmaker:
_log.debug("db connector: create_sessionmaker")
return sessionmaker(bind=engine)

0 comments on commit c809afc

Please sign in to comment.