Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spark session connection #279

Merged
merged 60 commits into from
Mar 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
23a114e
Add session module
JCZuurmond Jan 28, 2022
1a19d96
Add session connection method
JCZuurmond Jan 28, 2022
cd707b1
Add session extras to setup.py
JCZuurmond Jan 28, 2022
dbee595
Add check for session method
JCZuurmond Jan 28, 2022
195e925
Add session connection wrapper
JCZuurmond Jan 28, 2022
ba1b5d9
Add sessioin to connection manager
JCZuurmond Jan 28, 2022
7f8e88d
Remove unused imports
JCZuurmond Jan 28, 2022
b3db680
Add spark session dbtspec
JCZuurmond Jan 28, 2022
1a040d7
Add tox spark session environment
JCZuurmond Jan 28, 2022
97b562a
Add missing settings to dbt spec
JCZuurmond Jan 28, 2022
eec242c
Install session requirements
JCZuurmond Jan 28, 2022
7162ee6
Add tox spark session to circle ci
JCZuurmond Jan 28, 2022
ff421db
Add pytest spark as test requirement
JCZuurmond Jan 28, 2022
10267b1
Add fixutre to force use spark session
JCZuurmond Jan 28, 2022
0c39bf1
Add pytest ini
JCZuurmond Jan 28, 2022
0bd8499
Update passenv in tox
JCZuurmond Jan 28, 2022
1926a54
Set catalog implementation to hive
JCZuurmond Jan 28, 2022
8969056
Make separate session connection wrapper
JCZuurmond Feb 6, 2022
4163251
Format parameters
JCZuurmond Feb 6, 2022
425a002
Run spark session before thrift
JCZuurmond Feb 6, 2022
4573733
Add spark to dev requirements
JCZuurmond Feb 6, 2022
61119c6
Fix session module
JCZuurmond Feb 6, 2022
0b88252
Bump Spark session python version
JCZuurmond Feb 6, 2022
4a3008d
Change docker image for spark session
JCZuurmond Feb 6, 2022
65aafcc
Install python3
JCZuurmond Feb 6, 2022
546d509
Update ci
JCZuurmond Feb 9, 2022
4fb252c
Remove spark fixture
JCZuurmond Feb 9, 2022
e962364
Move session connection wrapper to session module
JCZuurmond Feb 9, 2022
15aaa7c
Disable tests that require hive support
JCZuurmond Feb 9, 2022
94fb929
Format
JCZuurmond Feb 9, 2022
1d69d63
Change python 3 to python 3.8
JCZuurmond Feb 9, 2022
f1abb75
Install non-python dependencies
JCZuurmond Feb 9, 2022
48ac80d
Remove dev-requirements
JCZuurmond Feb 9, 2022
5791dc9
Remove pytest ini
JCZuurmond Feb 9, 2022
99f44e3
Update the install
JCZuurmond Feb 9, 2022
237bc56
Add session method to change log
JCZuurmond Feb 9, 2022
5d26e1b
Do not pin sasl version
JCZuurmond Mar 10, 2022
3643f50
Delete spark session test profile
JCZuurmond Mar 10, 2022
0d9ef41
Add postgres container for hive support
JCZuurmond Mar 15, 2022
edfa4d0
Enable all session tests
JCZuurmond Mar 15, 2022
c2ea0b2
Enable hive support
JCZuurmond Mar 15, 2022
311928a
Add delta as file format
JCZuurmond Mar 15, 2022
cd0ed28
Use equals in spark defaults
JCZuurmond Mar 16, 2022
f2d4baa
Change reference to find spark home
JCZuurmond Mar 16, 2022
b53bdc3
Copy configs in one go
JCZuurmond Mar 16, 2022
9a41af7
List spark conf
JCZuurmond Mar 16, 2022
7196cd7
Let session test be the same as thrift
JCZuurmond Mar 16, 2022
fda5b01
Update spark defaults
JCZuurmond Mar 16, 2022
1164a81
Enable error logging on postgres
JCZuurmond Mar 16, 2022
d5e6bd6
Remove ls
JCZuurmond Mar 16, 2022
afce326
Add port to connection url
JCZuurmond Mar 16, 2022
3ff142a
Do not copy spark config
JCZuurmond Mar 16, 2022
1efabb3
Print postgres
JCZuurmond Mar 24, 2022
63ecb64
Remove postgres logging from thrift
JCZuurmond Mar 24, 2022
85d23b8
Remove postgres from spark session tests
JCZuurmond Mar 24, 2022
902b993
Change connection url back to dbt-hive-metastore
JCZuurmond Mar 24, 2022
ef3c35b
Revert Spark defaults changes
JCZuurmond Mar 24, 2022
40504cf
Disable tests and explain why
JCZuurmond Mar 24, 2022
37ebe30
Move change log to top of file
JCZuurmond Mar 24, 2022
5d745ff
Move contributor note up in changelog
jtcohen6 Mar 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ jobs:
- checkout
- run: tox -e flake8,unit

integration-spark-session:
environment:
DBT_INVOCATION_ENV: circle
docker:
- image: godatadriven/pyspark:3.1
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
steps:
- checkout
- run: apt-get update
- run: python3 -m pip install --upgrade pip
- run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev
- run: python3 -m pip install tox
- run:
name: Run integration tests
command: tox -e integration-spark-session
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
no_output_timeout: 1h
- store_artifacts:
path: ./logs

integration-spark-thrift:
environment:
DBT_INVOCATION_ENV: circle
Expand Down Expand Up @@ -90,7 +108,7 @@ jobs:
no_output_timeout: 1h
- store_artifacts:
path: ./logs

integration-spark-databricks-odbc-endpoint:
<<: *databricks-odbc
steps:
Expand All @@ -107,6 +125,9 @@ workflows:
test-everything:
jobs:
- unit
- integration-spark-session:
requires:
- unit
- integration-spark-thrift:
requires:
- unit
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### Features
- Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279))

### Contributors
- [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279))

## dbt-spark 1.1.0b1 (March 23, 2022)

### Features
Expand Down
19 changes: 19 additions & 0 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SparkConnectionMethod(StrEnum):
THRIFT = 'thrift'
HTTP = 'http'
ODBC = 'odbc'
SESSION = 'session'


@dataclass
Expand Down Expand Up @@ -133,6 +134,18 @@ def __post_init__(self):
"`pip install dbt-spark[PyHive]`"
)

if self.method == SparkConnectionMethod.SESSION:
try:
import pyspark # noqa: F401
except ImportError as e:
raise dbt.exceptions.RuntimeException(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
"`pip install dbt-spark[session]`\n\n"
f"ImportError({e.msg})"
) from e

@property
def type(self):
return 'spark'
Expand Down Expand Up @@ -443,6 +456,12 @@ def open(cls, connection):

conn = pyodbc.connect(connection_str, autocommit=True)
handle = PyodbcConnectionWrapper(conn)
elif creds.method == SparkConnectionMethod.SESSION:
from .session import ( # noqa: F401
Connection,
SessionConnectionWrapper,
)
handle = SessionConnectionWrapper(Connection())
else:
raise dbt.exceptions.DbtProfileError(
f"invalid credential method: {creds.method}"
Expand Down
221 changes: 221 additions & 0 deletions dbt/adapters/spark/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""Spark session integration."""

from __future__ import annotations

import datetime as dt
from types import TracebackType
from typing import Any

from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from pyspark.sql import DataFrame, Row, SparkSession


logger = AdapterLogger("Spark")
NUMBERS = DECIMALS + (int, float)


class Cursor:
"""
Mock a pyodbc cursor.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor
"""

def __init__(self) -> None:
self._df: DataFrame | None = None
self._rows: list[Row] | None = None

def __enter__(self) -> Cursor:
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: Exception | None,
exc_tb: TracebackType | None,
) -> bool:
self.close()
return True

@property
def description(
self,
) -> list[tuple[str, str, None, None, None, None, bool]]:
"""
Get the description.

Returns
-------
out : list[tuple[str, str, None, None, None, None, bool]]
The description.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#description
"""
if self._df is None:
description = list()
else:
description = [
(
field.name,
field.dataType.simpleString(),
None,
None,
None,
None,
field.nullable,
)
for field in self._df.schema.fields
]
return description

def close(self) -> None:
"""
Close the connection.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#close
"""
self._df = None
self._rows = None

def execute(self, sql: str, *parameters: Any) -> None:
"""
Execute a sql statement.

Parameters
----------
sql : str
Execute a sql statement.
*parameters : Any
The parameters.

Raises
------
NotImplementedError
If there are parameters given. We do not format sql statements.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#executesql-parameters
"""
if len(parameters) > 0:
sql = sql % parameters
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
self._df = spark_session.sql(sql)

def fetchall(self) -> list[Row] | None:
"""
Fetch all data.

Returns
-------
out : list[Row] | None
The rows.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchall
"""
if self._rows is None and self._df is not None:
self._rows = self._df.collect()
return self._rows

def fetchone(self) -> Row | None:
"""
Fetch the first output.

Returns
-------
out : Row | None
The first row.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchone
"""
if self._rows is None and self._df is not None:
self._rows = self._df.collect()

if self._rows is not None and len(self._rows) > 0:
row = self._rows.pop(0)
else:
row = None

return row


class Connection:
"""
Mock a pyodbc connection.

Source
------
https://github.com/mkleehammer/pyodbc/wiki/Connection
"""

def cursor(self) -> Cursor:
"""
Get a cursor.

Returns
-------
out : Cursor
The cursor.
"""
return Cursor()


class SessionConnectionWrapper(object):
"""Connection wrapper for the sessoin connection method."""

def __init__(self, handle):
self.handle = handle
self._cursor = None

def cursor(self):
self._cursor = self.handle.cursor()
return self

def cancel(self):
logger.debug("NotImplemented: cancel")

def close(self):
if self._cursor:
self._cursor.close()

def rollback(self, *args, **kwargs):
logger.debug("NotImplemented: rollback")

def fetchall(self):
return self._cursor.fetchall()

def execute(self, sql, bindings=None):
if sql.strip().endswith(";"):
sql = sql.strip()[:-1]

if bindings is None:
self._cursor.execute(sql)
else:
bindings = [self._fix_binding(binding) for binding in bindings]
self._cursor.execute(sql, *bindings)

@property
def description(self):
return self._cursor.description

@classmethod
def _fix_binding(cls, value):
"""Convert complex datatypes to primitives that can be loaded by
the Spark driver"""
if isinstance(value, NUMBERS):
return float(value)
elif isinstance(value, dt.datetime):
return f"'{value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}'"
else:
return f"'{value}'"
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pytest-csv

# Test requirements
pytest-dbt-adapter==0.6.0
sasl==0.2.1
sasl>=0.2.1
thrift_sasl==0.4.1
10 changes: 7 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ def _get_dbt_core_version():
'PyHive[hive]>=0.6.0,<0.7.0',
'thrift>=0.11.0,<0.16.0',
]
all_extras = odbc_extras + pyhive_extras
session_extras = [
"pyspark>=3.0.0,<4.0.0"
]
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
all_extras = odbc_extras + pyhive_extras + session_extras

setup(
name=package_name,
Expand All @@ -83,8 +86,9 @@ def _get_dbt_core_version():
],
extras_require={
"ODBC": odbc_extras,
"PyHive": pyhive_extras,
"all": all_extras
"PyHive": pyhive_extras,
"session": session_extras,
"all": all_extras,
},
zip_safe=False,
classifiers=[
Expand Down
12 changes: 3 additions & 9 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
def pytest_configure(config):
config.addinivalue_line(
"markers", "profile_databricks_cluster"
)
config.addinivalue_line(
"markers", "profile_databricks_sql_endpoint"
)
config.addinivalue_line(
"markers", "profile_apache_spark"
)
config.addinivalue_line("markers", "profile_databricks_cluster")
config.addinivalue_line("markers", "profile_databricks_sql_endpoint")
config.addinivalue_line("markers", "profile_apache_spark")
17 changes: 17 additions & 0 deletions tests/specs/spark-session.dbtspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
target:
type: spark
method: session
host: localhost
schema: "analytics_{{ var('_dbt_random_suffix') }}"
sequences:
test_dbt_empty: empty
# requires a metastore for persisting over dbt runs
# test_dbt_base: base
# test_dbt_ephemeral: ephemeral
# test_dbt_incremental: incremental
# snapshots require delta format
# test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
# test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
test_dbt_data_test: data_test
test_dbt_schema_test: schema_test
test_dbt_ephemeral_data_tests: data_test_ephemeral_models
12 changes: 12 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,15 @@ deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev_requirements.txt
-e.

[testenv:integration-spark-session]
basepython = python3
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-session.dbtspec'
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
passenv =
DBT_*
PYTEST_*
PIP_CACHE_DIR
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev_requirements.txt
-e.[session]