forked from kedro-org/kedro-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Snowpark (Snowflake) dataset for kedro (kedro-org#104)
* Add Snowpark datasets Signed-off-by: Vladimir Filimonov <vladimir_filimonov@mckinsey.com> Signed-off-by: heber-urdaneta <heber_urdaneta@mckinsey.com> Signed-off-by: Danny Farah <danny_farah@mckinsey.com>
- Loading branch information
1 parent
8712385
commit f1a03c5
Showing
12 changed files
with
476 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Provides I/O modules for Snowflake.""" | ||
|
||
__all__ = ["SnowparkTableDataSet"] | ||
|
||
from contextlib import suppress | ||
|
||
with suppress(ImportError): | ||
from .snowpark_dataset import SnowparkTableDataSet |
232 changes: 232 additions & 0 deletions
232
kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
"""``AbstractDataSet`` implementation to access Snowflake using Snowpark dataframes | ||
""" | ||
import logging | ||
from copy import deepcopy | ||
from typing import Any, Dict | ||
|
||
import snowflake.snowpark as sp | ||
from kedro.io.core import AbstractDataSet, DataSetError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class SnowparkTableDataSet(AbstractDataSet): | ||
"""``SnowparkTableDataSet`` loads and saves Snowpark dataframes. | ||
As of Mar-2023, the snowpark connector only works with Python 3.8. | ||
Example usage for the | ||
`YAML API <https://kedro.readthedocs.io/en/stable/data/\ | ||
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_: | ||
.. code-block:: yaml | ||
weather: | ||
type: kedro_datasets.snowflake.SnowparkTableDataSet | ||
table_name: "weather_data" | ||
database: "meteorology" | ||
schema: "observations" | ||
credentials: db_credentials | ||
save_args: | ||
mode: overwrite | ||
column_order: name | ||
table_type: '' | ||
You can skip everything but "table_name" if the database and | ||
schema are provided via credentials. That way catalog entries can be shorter | ||
if, for example, all used Snowflake tables live in same database/schema. | ||
Values in the dataset definition take priority over those defined in credentials. | ||
Example: | ||
Credentials file provides all connection attributes, catalog entry | ||
"weather" reuses credentials parameters, "polygons" catalog entry reuses | ||
all credentials parameters except providing a different schema name. | ||
Second example of credentials file uses ``externalbrowser`` authentication. | ||
catalog.yml | ||
.. code-block:: yaml | ||
weather: | ||
type: kedro_datasets.snowflake.SnowparkTableDataSet | ||
table_name: "weather_data" | ||
database: "meteorology" | ||
schema: "observations" | ||
credentials: snowflake_client | ||
save_args: | ||
mode: overwrite | ||
column_order: name | ||
table_type: '' | ||
polygons: | ||
type: kedro_datasets.snowflake.SnowparkTableDataSet | ||
table_name: "geopolygons" | ||
credentials: snowflake_client | ||
schema: "geodata" | ||
credentials.yml | ||
.. code-block:: yaml | ||
snowflake_client: | ||
account: 'ab12345.eu-central-1' | ||
port: 443 | ||
warehouse: "datascience_wh" | ||
database: "detailed_data" | ||
schema: "observations" | ||
user: "service_account_abc" | ||
password: "supersecret" | ||
credentials.yml (with externalbrowser authenticator) | ||
.. code-block:: yaml | ||
snowflake_client: | ||
account: 'ab12345.eu-central-1' | ||
port: 443 | ||
warehouse: "datascience_wh" | ||
database: "detailed_data" | ||
schema: "observations" | ||
user: "john_doe@wdomain.com" | ||
authenticator: "externalbrowser" | ||
""" | ||
|
||
# this dataset cannot be used with ``ParallelRunner``, | ||
# therefore it has the attribute ``_SINGLE_PROCESS = True`` | ||
# for parallelism within a pipeline please consider | ||
# ``ThreadRunner`` instead | ||
_SINGLE_PROCESS = True | ||
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] | ||
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] | ||
|
||
def __init__( # pylint: disable=too-many-arguments | ||
self, | ||
table_name: str, | ||
schema: str = None, | ||
database: str = None, | ||
load_args: Dict[str, Any] = None, | ||
save_args: Dict[str, Any] = None, | ||
credentials: Dict[str, Any] = None, | ||
) -> None: | ||
"""Creates a new instance of ``SnowparkTableDataSet``. | ||
Args: | ||
table_name: The table name to load or save data to. | ||
schema: Name of the schema where ``table_name`` is. | ||
Optional as can be provided as part of ``credentials`` | ||
dictionary. Argument value takes priority over one provided | ||
in ``credentials`` if any. | ||
database: Name of the database where ``schema`` is. | ||
Optional as can be provided as part of ``credentials`` | ||
dictionary. Argument value takes priority over one provided | ||
in ``credentials`` if any. | ||
load_args: Currently not used | ||
save_args: Provided to underlying snowpark ``save_as_table`` | ||
To find all supported arguments, see here: | ||
https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameWriter.saveAsTable.html | ||
credentials: A dictionary with a snowpark connection string. | ||
To find all supported arguments, see here: | ||
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect | ||
""" | ||
|
||
if not table_name: | ||
raise DataSetError("'table_name' argument cannot be empty.") | ||
|
||
if not credentials: | ||
raise DataSetError("'credentials' argument cannot be empty.") | ||
|
||
if not database: | ||
if not ("database" in credentials and credentials["database"]): | ||
raise DataSetError( | ||
"'database' must be provided by credentials or dataset." | ||
) | ||
database = credentials["database"] | ||
|
||
if not schema: | ||
if not ("schema" in credentials and credentials["schema"]): | ||
raise DataSetError( | ||
"'schema' must be provided by credentials or dataset." | ||
) | ||
schema = credentials["schema"] | ||
# Handle default load and save arguments | ||
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) | ||
if load_args is not None: | ||
self._load_args.update(load_args) | ||
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) | ||
if save_args is not None: | ||
self._save_args.update(save_args) | ||
|
||
self._table_name = table_name | ||
self._database = database | ||
self._schema = schema | ||
|
||
connection_parameters = credentials | ||
connection_parameters.update( | ||
{"database": self._database, "schema": self._schema} | ||
) | ||
self._connection_parameters = connection_parameters | ||
self._session = self._get_session(self._connection_parameters) | ||
|
||
def _describe(self) -> Dict[str, Any]: | ||
return { | ||
"table_name": self._table_name, | ||
"database": self._database, | ||
"schema": self._schema, | ||
} | ||
|
||
@staticmethod | ||
def _get_session(connection_parameters) -> sp.Session: | ||
"""Given a connection string, create singleton connection | ||
to be used across all instances of `SnowparkTableDataSet` that | ||
need to connect to the same source. | ||
connection_parameters is a dictionary of any values | ||
supported by snowflake python connector: | ||
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect | ||
example: | ||
connection_parameters = { | ||
"account": "", | ||
"user": "", | ||
"password": "", (optional) | ||
"role": "", (optional) | ||
"warehouse": "", (optional) | ||
"database": "", (optional) | ||
"schema": "", (optional) | ||
"authenticator: "" (optional) | ||
} | ||
""" | ||
try: | ||
logger.debug("Trying to reuse active snowpark session...") | ||
session = sp.context.get_active_session() | ||
except sp.exceptions.SnowparkSessionException: | ||
logger.debug("No active snowpark session found. Creating") | ||
session = sp.Session.builder.configs(connection_parameters).create() | ||
return session | ||
|
||
def _load(self) -> sp.DataFrame: | ||
table_name = [ | ||
self._database, | ||
self._schema, | ||
self._table_name, | ||
] | ||
|
||
sp_df = self._session.table(".".join(table_name)) | ||
return sp_df | ||
|
||
def _save(self, data: sp.DataFrame) -> None: | ||
table_name = [ | ||
self._database, | ||
self._schema, | ||
self._table_name, | ||
] | ||
|
||
data.write.save_as_table(table_name, **self._save_args) | ||
|
||
def _exists(self) -> bool: | ||
session = self._session | ||
query = "SELECT COUNT(*) FROM {database}.INFORMATION_SCHEMA.TABLES \ | ||
WHERE TABLE_SCHEMA = '{schema}' \ | ||
AND TABLE_NAME = '{table_name}'" | ||
rows = session.sql( | ||
query.format( | ||
database=self._database, | ||
schema=self._schema, | ||
table_name=self._table_name, | ||
) | ||
).collect() | ||
return rows[0][0] == 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Snowpark connector testing | ||
|
||
Execution of automated tests for Snowpark connector requires real Snowflake instance access. Therefore tests located in this folder are **disabled** by default from pytest execution scope using [conftest.py](conftest.py). | ||
|
||
[Makefile](/Makefile) provides separate argument ``test-snowflake-only`` to run only tests related to Snowpark connector. To run tests one need to provide Snowflake connection parameters via environment variables: | ||
* SNOWSQL_ACCOUNT - Snowflake account name with region. Ex `ab12345.eu-central-2` | ||
* SNOWSQL_WAREHOUSE - Snowflake virtual warehouse to use | ||
* SNOWSQL_DATABASE - Database to use | ||
* SNOWSQL_SCHEMA - Schema to use when creating tables for tests | ||
* SNOWSQL_ROLE - Role to use for connection | ||
* SNOWSQL_USER - Username to use for connection | ||
* SNOWSQL_PWD - Plain password to use for connection | ||
|
||
All environment variables need to be provided for tests to run. | ||
|
||
Here is example shell command to run snowpark tests via make utility: | ||
```bash | ||
SNOWSQL_ACCOUNT='ab12345.eu-central-2' SNOWSQL_WAREHOUSE='DEV_WH' SNOWSQL_DATABASE='DEV_DB' SNOWSQL_ROLE='DEV_ROLE' SNOWSQL_USER='DEV_USER' SNOWSQL_SCHEMA='DATA' SNOWSQL_PWD='supersecret' make test-snowflake-only | ||
``` | ||
|
||
Currently running tests supports only simple username & password authentication and not SSO/MFA. | ||
|
||
As of Mar-2023, the snowpark connector only works with Python 3.8. | ||
|
||
## Snowflake permissions required | ||
Credentials provided via environment variables should have following permissions granted to run tests successfully: | ||
* Create tables in a given schema | ||
* Drop tables in a given schema | ||
* Insert rows into tables in a given schema | ||
* Query tables in a given schema | ||
* Query `INFORMATION_SCHEMA.TABLES` of respective database | ||
|
||
## Extending tests | ||
Contributors adding new tests should add `@pytest.mark.snowflake` decorator to each test. Exclusion of Snowpark-related pytests from overall execution scope in [conftest.py](conftest.py) works based on markers. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
""" | ||
We disable execution of tests that require real Snowflake instance | ||
to run by default. Providing -m snowflake option explicitly to | ||
pytest will make these and only these tests run | ||
""" | ||
import pytest | ||
|
||
|
||
def pytest_collection_modifyitems(config, items): | ||
markers_arg = config.getoption("-m") | ||
|
||
# Naive implementation to handle basic marker expressions | ||
# Will not work if someone will (ever) run pytest with complex marker | ||
# expressions like "-m spark and not (snowflake or pandas)" | ||
if ( | ||
"snowflake" in markers_arg.lower() | ||
and "not snowflake" not in markers_arg.lower() | ||
): | ||
return | ||
|
||
skip_snowflake = pytest.mark.skip(reason="need -m snowflake option to run") | ||
for item in items: | ||
if "snowflake" in item.keywords: | ||
item.add_marker(skip_snowflake) |
Oops, something went wrong.