Skip to content

Commit

Permalink
DB-22748 Fix canceling queries in Superset (#2)
Browse files Browse the repository at this point in the history
* Adding support for query cancellation

* Removing double import

* Adding support for query cancellation

Removing double import

Inverting logic for fetch data.

Re-add missing function

* Fiding doubled up merge inputs

* Handle Query cancellation for multiple queries

* Adding mutex lock to query id mapping

* Removing unecesssary iteration over values of cache

* Fixing locks in db engine spec

* Fix error capture in fetch_data
  • Loading branch information
alexclavel-ocient authored Jan 30, 2023
1 parent 8059833 commit 6c1d956
Showing 1 changed file with 58 additions and 14 deletions.
72 changes: 58 additions & 14 deletions superset/db_engine_specs/ocient.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import re

from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import Session
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType
from flask_babel import gettext as __
Expand All @@ -27,6 +28,9 @@
from superset import app
from superset.models.core import Database
from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Optional, Pattern
import threading

from superset.models.sql_lab import Query
# Ensure pyocient inherits Superset's logging level
superset_log_level = app.config['LOG_LEVEL']
pyocient.logger.setLevel(superset_log_level)
Expand Down Expand Up @@ -62,6 +66,8 @@
"The reference to column '(?P<column>.*?)' is not valid"
)



# Custom datatype conversion functions

def _to_hex(data: bytes) -> str:
Expand Down Expand Up @@ -154,6 +160,12 @@ class OcientEngineSpec(BaseEngineSpec):
force_column_alias_quotes = True
max_column_name_length = 30

# Store mapping of superset Query id -> Ocient ID
# These are inserted into the cache when executing the query
# They are then removed, either upon cancellation or query completion
query_id_mapping: Dict[str, str]= dict()
query_id_mapping_lock = threading.Lock()

custom_errors : Dict[Pattern[str], Tuple[str, SupersetErrorType, Dict[str, Any]]] = {
CONNECTION_INVALID_USERNAME_REGEX: (
__('The username "%(username)s" does not exist.'),
Expand Down Expand Up @@ -212,28 +224,29 @@ class OcientEngineSpec(BaseEngineSpec):
"P0.25Y": "ROUND({col}, 'QUARTER')",
"P1Y": "ROUND({col}, 'YEAR')",
}


@classmethod
def get_table_names(
cls, database: "Database", inspector: Inspector, schema: Optional[str]
cls, database: Database, inspector: Inspector, schema: Optional[str]
) -> List[str]:
return sorted(inspector.get_table_names(schema))


@classmethod
def fetch_data(cls, cursor, lim=None):
rows = super(OcientEngineSpec, cls).fetch_data(cursor)

if (len(rows)) == 0:
# No rows were produced
return rows

if type(rows[0]).__name__ != 'Row':
# TODO what else is returned here?
return rows

# Peek at the schema to determine which column values, if any,
# require sanitization.
columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(cursor)
try:
rows = super(OcientEngineSpec, cls).fetch_data(cursor)
except Exception as exception:
with OcientEngineSpec.query_id_mapping_lock:
del OcientEngineSpec.query_id_mapping[getattr(cursor, 'superset_query_id')]
raise exception


if len(rows) > 0 and type(rows[0]).__name__ == rows:
# Peek at the schema to determine which column values, if any,
# require sanitization.
columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(cursor)

if columns_to_sanitize:
# At least 1 column has to be sanitized.
Expand All @@ -247,3 +260,34 @@ def do_nothing(x):
# Rows from pyocient are given as NamedTuple, so we need to recreate the whole table
rows = [[sanitization_functions[i](row[i]) for i in range(len(row))] for row in rows]
return rows


@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
# Return a Non-None value
# If None is returned, Superset will not call cancel_query
return 'DUMMY_VALUE'


@classmethod
def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
with OcientEngineSpec.query_id_mapping_lock:
OcientEngineSpec.query_id_mapping[query.id] = cursor.query_id

# Add the query id to the cursor
setattr(cursor, "superset_query_id", query.id)
return super().handle_cursor(cursor, query, session)


@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
with OcientEngineSpec.query_id_mapping_lock:
if query.id in OcientEngineSpec.query_id_mapping:
cursor.execute(f'CANCEL {OcientEngineSpec.query_id_mapping[query.id]}')
# Query has been cancelled, so we can safely remove the cursor from the cache
del OcientEngineSpec.query_id_mapping[query.id]

return True
# If the query is not in the cache, it must have either been cancelled elsewhere or completed
else:
return False

0 comments on commit 6c1d956

Please sign in to comment.