Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly close cursors #163

Merged
merged 2 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
- Apply "Initial refactoring of incremental materialization" ([#148](https://github.com/databricks/dbt-databricks/pull/148))
- Now dbt-databricks uses `adapter.get_incremental_strategy_macro` instead of `dbt_spark_get_incremental_sql` macro to dispatch the incremental strategy macro. The overwritten `dbt_spark_get_incremental_sql` macro will not work anymore.

## dbt-databricks 1.2.2 (Release TBD)

### Under the hood
- Explicitly close cursors ([#163](https://github.com/databricks/dbt-databricks/pull/163))

## dbt-databricks 1.2.1 (August 24, 2022)

### Features
Expand Down
127 changes: 86 additions & 41 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from contextlib import contextmanager
from dataclasses import dataclass
import itertools
Expand All @@ -22,7 +23,8 @@
import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.databricks import __version__
from dbt.contracts.connection import Connection, ConnectionState
from dbt.clients import agate_helper
from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
Expand Down Expand Up @@ -141,46 +143,53 @@ def _connection_keys(self, *, with_aliases: bool = False) -> Tuple[str, ...]:
return tuple(connection_keys)


class DatabricksSQLConnectionWrapper(object):
class DatabricksSQLConnectionWrapper:
"""Wrap a Databricks SQL connector in a way that no-ops transactions"""

_conn: DatabricksSQLConnection
_cursor: Optional[DatabricksSQLCursor]

def __init__(self, conn: DatabricksSQLConnection):
self._conn = conn
self._cursor = None

def cursor(self) -> "DatabricksSQLConnectionWrapper":
self._cursor = self._conn.cursor()
return self

def cancel(self) -> None:
if self._cursor:
try:
self._cursor.cancel()
except DBSQLError as exc:
logger.debug("Exception while cancelling query: {}".format(exc))
_log_dbsql_errors(exc)
def cursor(self) -> "DatabricksSQLCursorWrapper":
return DatabricksSQLCursorWrapper(self._conn.cursor())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid creating an instance of the wrapper each time we invoke the cursor?

Copy link
Collaborator Author

@ueshin ueshin Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we manage each cursor otherwise?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm this method should return a new cursor.


def close(self) -> None:
if self._cursor:
try:
self._cursor.close()
except DBSQLError as exc:
logger.debug("Exception while closing cursor: {}".format(exc))
_log_dbsql_errors(exc)
self._conn.close()

def rollback(self, *args: Any, **kwargs: Any) -> None:
logger.debug("NotImplemented: rollback")


class DatabricksSQLCursorWrapper:
"""Wrap a Databricks SQL cursor in a way that no-ops transactions"""

_cursor: DatabricksSQLCursor

def __init__(self, cursor: DatabricksSQLCursor):
self._cursor = cursor

def cancel(self) -> None:
try:
self._cursor.cancel()
except DBSQLError as exc:
logger.debug("Exception while cancelling query: {}".format(exc))
_log_dbsql_errors(exc)

def close(self) -> None:
try:
self._cursor.close()
except DBSQLError as exc:
logger.debug("Exception while closing cursor: {}".format(exc))
_log_dbsql_errors(exc)

def fetchall(self) -> Sequence[Tuple]:
assert self._cursor is not None
return self._cursor.fetchall()

def fetchone(self) -> Optional[Tuple]:
return self._cursor.fetchone()

def execute(self, sql: str, bindings: Optional[Sequence[Any]] = None) -> None:
assert self._cursor is not None
if sql.strip().endswith(";"):
sql = sql.strip()[:-1]
if bindings is not None:
Expand Down Expand Up @@ -210,21 +219,23 @@ def description(
Optional[bool],
]
]:
assert self._cursor is not None
return self._cursor.description

def schemas(self, catalog_name: str, schema_name: Optional[str] = None) -> None:
assert self._cursor is not None
self._cursor.schemas(catalog_name=catalog_name, schema_name=schema_name)

def __del__(self) -> None:
if self._cursor.open:
# This should not happen. The cursor should explicitly be closed.
self._cursor.close()
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn("The cursor was closed by destructor.")


class DatabricksConnectionManager(SparkConnectionManager):
TYPE: ClassVar[str] = "databricks"

DROP_JAVA_STACKTRACE_REGEX: ClassVar["re.Pattern[str]"] = re.compile(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deleted because we never used it?

Copy link
Collaborator Author

@ueshin ueshin Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It should've been deleted when we dropped databricks-sql-connector<2.0.

r"(?<=Caused by: )(.+?)(?=^\t?at )", re.DOTALL | re.MULTILINE
)

@contextmanager
def exception_handler(self, sql: str) -> Iterator[None]:
try:
Expand All @@ -248,28 +259,62 @@ def exception_handler(self, sql: str) -> Iterator[None]:
else:
raise dbt.exceptions.RuntimeException(str(exc)) from exc

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
*,
close_cursor: bool = False,
) -> Tuple[Connection, Any]:
conn, cursor = super().add_query(sql, auto_begin, bindings, abridge_sql_log)
if close_cursor and hasattr(cursor, "close"):
cursor.close()
return conn, cursor

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
try:
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = agate_helper.empty_table()
return response, table
finally:
cursor.close()

def _execute_cursor(
self, log_sql: str, f: Callable[[DatabricksSQLConnectionWrapper], None]
self, log_sql: str, f: Callable[[DatabricksSQLCursorWrapper], None]
) -> Table:
connection = self.get_thread_connection()

fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))

with self.exception_handler(log_sql):
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
pre = time.time()
cursor: Optional[DatabricksSQLCursorWrapper] = None
try:
with self.exception_handler(log_sql):
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
pre = time.time()

handle: DatabricksSQLConnectionWrapper = connection.handle
cursor = handle.cursor()
f(cursor)
handle: DatabricksSQLConnectionWrapper = connection.handle
cursor = handle.cursor()
f(cursor)

fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
)
)
)

return self.get_result_from_cursor(cursor)
return self.get_result_from_cursor(cursor)
finally:
if cursor is not None:
cursor.close()

def list_schemas(self, database: str, schema: Optional[str] = None) -> Table:
return self._execute_cursor(
Expand Down
35 changes: 34 additions & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
LIST_RELATIONS_MACRO_NAME,
LIST_SCHEMAS_MACRO_NAME,
)
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.relation import RelationType
import dbt.exceptions
Expand Down Expand Up @@ -211,6 +211,39 @@ def _get_columns_for_catalog(self, relation: DatabricksRelation) -> Iterable[Dic
as_dict["column_type"] = as_dict.pop("dtype")
yield as_dict

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
*,
close_cursor: bool = False,
) -> Tuple[Connection, Any]:
return self.connections.add_query(
sql, auto_begin, bindings, abridge_sql_log, close_cursor=close_cursor
)

def run_sql_for_tests(
self, sql: str, fetch: str, conn: Connection
) -> Optional[Union[Optional[Tuple], List[Tuple]]]:
cursor = conn.handle.cursor()
try:
cursor.execute(sql)
if fetch == "one":
return cursor.fetchone()
elif fetch == "all":
return cursor.fetchall()
else:
return None
except BaseException as e:
print(sql)
print(e)
raise
finally:
cursor.close()
conn.transaction_open = False

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite"]

Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
{%- endfor %}
{% endset %}

{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}
{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True, close_cursor=True) %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to pass the close_cursor flag for seed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adapter.add_query doesn't close the cursor by default, then the cursor will remain opened.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!


{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def run_sql(self, query, fetch="None", kwargs=None, connection_name=None):
try:
cursor.execute(sql)
if fetch == "one":
return cursor.fetchall()[0]
return cursor.fetchone()
elif fetch == "all":
return cursor.fetchall()
else:
Expand Down