diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 05bdde32..a9099967 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -290,6 +290,51 @@ jobs: name: fsspec_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv path: fsspec_results.csv + plugins: + name: plugins test / python ${{ matrix.python-version }} + + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + python-version: ['3.9'] + + env: + TOXENV: "plugins" + PYTEST_ADDOPTS: "-v --color=yes --csv plugins_results.csv" + + steps: + - name: Check out the repository + uses: actions/checkout@v3 + with: + persist-credentials: false + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python dependencies + run: | + python -m pip install tox + python -m pip --version + tox --version + + - name: Run tox + run: tox + + - name: Get current date + if: always() + id: date + run: echo "date=$(date +'%Y-%m-%dT%H_%M_%S')" >> $GITHUB_OUTPUT #no colons allowed for artifacts + + - uses: actions/upload-artifact@v3 + if: always() + with: + name: plugins_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv + path: plugins_results.csv + build: name: build packages diff --git a/.gitignore b/.gitignore index 9535a289..7f94f1ff 100644 --- a/.gitignore +++ b/.gitignore @@ -77,3 +77,4 @@ target/ .DS_Store .idea/ +.vscode/ diff --git a/dbt/adapters/duckdb/buenavista.py b/dbt/adapters/duckdb/buenavista.py index 0936b225..e6ed4af6 100644 --- a/dbt/adapters/duckdb/buenavista.py +++ b/dbt/adapters/duckdb/buenavista.py @@ -3,6 +3,7 @@ import psycopg2 from . import credentials +from . import utils from .environments import Environment from dbt.contracts.connection import AdapterResponse @@ -29,6 +30,9 @@ def handle(self): cursor.close() return conn + def get_binding_char(self) -> str: + return "%s" + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: identifier = parsed_model["alias"] payload = { @@ -42,5 +46,16 @@ def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> A handle.cursor().execute(json.dumps(payload)) return AdapterResponse(_message="OK") - def get_binding_char(self) -> str: - return "%s" + def load_source(self, plugin_name: str, source_config: utils.SourceConfig): + handle = self.handle() + payload = { + "method": "dbt_load_source", + "params": { + "plugin_name": plugin_name, + "source_config": source_config.as_dict(), + }, + } + cursor = handle.cursor() + cursor.execute(json.dumps(payload)) + cursor.close() + handle.close() diff --git a/dbt/adapters/duckdb/connections.py b/dbt/adapters/duckdb/connections.py index 8ab60014..4f306eaa 100644 --- a/dbt/adapters/duckdb/connections.py +++ b/dbt/adapters/duckdb/connections.py @@ -14,12 +14,19 @@ class DuckDBConnectionManager(SQLConnectionManager): TYPE = "duckdb" - LOCK = threading.RLock() - ENV = None + _LOCK = threading.RLock() + _ENV = None def __init__(self, profile: AdapterRequiredConfig): super().__init__(profile) + @classmethod + def env(cls) -> environments.Environment: + with cls._LOCK: + if not cls._ENV: + raise Exception("DuckDBConnectionManager environment requested before creation!") + return cls._ENV + @classmethod def open(cls, connection: Connection) -> Connection: if connection.state == ConnectionState.OPEN: @@ -27,11 +34,11 @@ def open(cls, connection: Connection) -> Connection: return connection credentials = cls.get_credentials(connection.credentials) - with cls.LOCK: + with cls._LOCK: try: - if not cls.ENV: - cls.ENV = environments.create(credentials) - connection.handle = cls.ENV.handle() + if not cls._ENV: + cls._ENV = environments.create(credentials) + connection.handle = cls._ENV.handle() connection.state = ConnectionState.OPEN except RuntimeError as e: @@ -79,9 +86,9 @@ def get_response(cls, cursor) -> AdapterResponse: @classmethod def close_all_connections(cls): - with cls.LOCK: - if cls.ENV is not None: - cls.ENV = None + with cls._LOCK: + if cls._ENV is not None: + cls._ENV = None atexit.register(DuckDBConnectionManager.close_all_connections) diff --git a/dbt/adapters/duckdb/credentials.py b/dbt/adapters/duckdb/credentials.py index 4d0a4721..f6f93ed8 100644 --- a/dbt/adapters/duckdb/credentials.py +++ b/dbt/adapters/duckdb/credentials.py @@ -43,6 +43,20 @@ def to_sql(self) -> str: return base +@dataclass +class PluginConfig(dbtClassMixin): + # The name that this plugin will be referred to by in sources/models; must + # be unique within the project + name: str + + # The fully-specified class name of the plugin code to use, which must be a + # subclass of dbt.adapters.duckdb.plugins.Plugin. + impl: str + + # A plugin-specific set of configuration options + config: Optional[Dict[str, Any]] = None + + @dataclass class Remote(dbtClassMixin): host: str @@ -61,7 +75,7 @@ class DuckDBCredentials(Credentials): # to DuckDB (e.g., if we need to enable using unsigned extensions) config_options: Optional[Dict[str, Any]] = None - # any extensions we want to install and load (httpfs, parquet, etc.) + # any DuckDB extensions we want to install and load (httpfs, parquet, etc.) extensions: Optional[Tuple[str, ...]] = None # any additional pragmas we want to configure on our DuckDB connections; @@ -95,6 +109,11 @@ class DuckDBCredentials(Credentials): # Used to configure remote environments/connections remote: Optional[Remote] = None + # A list of dbt-duckdb plugins that can be used to customize the + # behavior of loading source data and/or storing the relations that are + # created by SQL or Python models; see the plugins module for more details. + plugins: Optional[List[PluginConfig]] = None + @classmethod def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: data = super().__pre_deserialize__(data) diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index 8abfc13b..a667e2dd 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -1,10 +1,14 @@ +import abc import importlib.util import os import tempfile +from typing import Dict import duckdb from .credentials import DuckDBCredentials +from .plugins import Plugin +from .utils import SourceConfig from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError @@ -53,18 +57,21 @@ def cursor(self): return self._cursor -class Environment: +class Environment(abc.ABC): + @abc.abstractmethod def handle(self): - raise NotImplementedError - - def cursor(self): - raise NotImplementedError + pass + @abc.abstractmethod def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: - raise NotImplementedError + pass + + def get_binding_char(self) -> str: + return "?" - def close(self, cursor): - raise NotImplementedError + @abc.abstractmethod + def load_source(self, plugin_name: str, source_config: SourceConfig) -> str: + pass @classmethod def initialize_db(cls, creds: DuckDBCredentials): @@ -93,7 +100,7 @@ def initialize_db(cls, creds: DuckDBCredentials): return conn @classmethod - def initialize_cursor(cls, creds, cursor): + def initialize_cursor(cls, creds: DuckDBCredentials, cursor): # Extensions/settings need to be configured per cursor for ext in creds.extensions or []: cursor.execute(f"LOAD '{ext}'") @@ -103,6 +110,21 @@ def initialize_cursor(cls, creds, cursor): cursor.execute(f"SET {key} = '{value}'") return cursor + @classmethod + def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, Plugin]: + ret = {} + for plugin in creds.plugins or []: + if plugin.name in ret: + raise Exception("Duplicate plugin name: " + plugin.name) + else: + if plugin.impl in Plugin.WELL_KNOWN_PLUGINS: + plugin.impl = Plugin.WELL_KNOWN_PLUGINS[plugin.impl] + try: + ret[plugin.name] = Plugin.create(plugin.impl, plugin.config or {}) + except Exception as e: + raise Exception(f"Error attempting to create plugin {plugin.name}", e) + return ret + @classmethod def run_python_job(cls, con, load_df_function, identifier: str, compiled_code: str): mod_file = tempfile.NamedTemporaryFile(suffix=".py", delete=False) @@ -136,13 +158,11 @@ def run_python_job(cls, con, load_df_function, identifier: str, compiled_code: s finally: os.unlink(mod_file.name) - def get_binding_char(self) -> str: - return "?" - class LocalEnvironment(Environment): def __init__(self, credentials: DuckDBCredentials): self.conn = self.initialize_db(credentials) + self._plugins = self.initialize_plugins(credentials) self.creds = credentials def handle(self): @@ -159,6 +179,23 @@ def ldf(table_name): self.run_python_job(con, ldf, parsed_model["alias"], compiled_code) return AdapterResponse(_message="OK") + def load_source(self, plugin_name: str, source_config: SourceConfig): + if plugin_name not in self._plugins: + raise Exception( + f"Plugin {plugin_name} not found; known plugins are: " + + ",".join(self._plugins.keys()) + ) + df = self._plugins[plugin_name].load(source_config) + assert df is not None + handle = self.handle() + cursor = handle.cursor() + materialization = source_config.meta.get("materialization", "table") + cursor.execute( + f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" + ) + cursor.close() + handle.close() + def close(self): if self.conn: self.conn.close() diff --git a/dbt/adapters/duckdb/impl.py b/dbt/adapters/duckdb/impl.py index 6bd3b125..b00e9749 100644 --- a/dbt/adapters/duckdb/impl.py +++ b/dbt/adapters/duckdb/impl.py @@ -85,7 +85,7 @@ def use_database(self) -> bool: @available def get_binding_char(self): - return DuckDBConnectionManager.ENV.get_binding_char() + return DuckDBConnectionManager.env().get_binding_char() @available def external_write_options(self, write_location: str, rendered_options: dict) -> str: @@ -144,11 +144,8 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterRe connection = self.connections.get_if_exists() if not connection: connection = self.connections.get_thread_connection() - if DuckDBConnectionManager.ENV: - env = DuckDBConnectionManager.ENV - return env.submit_python_job(connection.handle, parsed_model, compiled_code) - else: - raise Exception("No ENV defined to execute dbt-duckdb python models!") + env = DuckDBConnectionManager.env() + return env.submit_python_job(connection.handle, parsed_model, compiled_code) def get_rows_different_sql( self, diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py new file mode 100644 index 00000000..ed37c726 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -0,0 +1,39 @@ +import abc +import importlib +from typing import Any +from typing import Dict + +from ..utils import SourceConfig +from dbt.dataclass_schema import dbtClassMixin + + +class PluginConfig(dbtClassMixin): + """A helper class for defining the configuration settings a particular plugin uses.""" + + pass + + +class Plugin(abc.ABC): + WELL_KNOWN_PLUGINS = { + "excel": "dbt.adapters.duckdb.plugins.excel.ExcelPlugin", + "gsheet": "dbt.adapters.duckdb.plugins.gsheet.GSheetPlugin", + "iceberg": "dbt.adapters.duckdb.plugins.iceberg.IcebergPlugin", + "sqlalchemy": "dbt.adapters.duckdb.plugins.sqlalchemy.SQLAlchemyPlugin", + } + + @classmethod + def create(cls, impl: str, config: Dict[str, Any]) -> "Plugin": + module_name, class_name = impl.rsplit(".", 1) + module = importlib.import_module(module_name) + Class = getattr(module, class_name) + if not issubclass(Class, Plugin): + raise TypeError(f"{impl} is not a subclass of Plugin") + return Class(config) + + @abc.abstractmethod + def __init__(self, plugin_config: Dict): + pass + + def load(self, source_config: SourceConfig): + """Load data from a source config and return it as a DataFrame-like object that DuckDB can read.""" + raise NotImplementedError diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py new file mode 100644 index 00000000..582527c9 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -0,0 +1,19 @@ +import pathlib +from typing import Dict + +import pandas as pd + +from . import Plugin +from ..utils import SourceConfig + + +class ExcelPlugin(Plugin): + def __init__(self, config: Dict): + self._config = config + + def load(self, source_config: SourceConfig): + ext_location = source_config.meta["external_location"] + ext_location = ext_location.format(**source_config.as_dict()) + source_location = pathlib.Path(ext_location.strip("'")) + sheet_name = source_config.meta.get("sheet_name", 0) + return pd.read_excel(source_location, sheet_name=sheet_name) diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py new file mode 100644 index 00000000..1b2621cc --- /dev/null +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from typing import Dict +from typing import Literal + +import gspread +import pandas as pd + +from . import Plugin +from . import PluginConfig +from ..utils import SourceConfig + + +@dataclass +class GSheetConfig(PluginConfig): + method: Literal["service", "oauth"] + + def client(self): + if self.method == "service": + return gspread.service_account() + else: + return gspread.oauth() + + +class GSheetPlugin(Plugin): + def __init__(self, config: Dict): + self._config = GSheetConfig.from_dict(config) + self._gc = self._config.client() + + def load(self, source_config: SourceConfig): + doc = None + if "title" in source_config.meta: + doc = self._gc.open(source_config.meta["title"]) + elif "key" in source_config.meta: + doc = self._gc.open_by_key(source_config.meta["key"]) + elif "url" in source_config.meta: + doc = self._gc.open_by_url(source_config.meta["url"]) + else: + raise Exception("Source config did not indicate a method to open a GSheet to read") + + sheet = None + if "worksheet" in source_config.meta: + work_id = source_config.meta["worksheet"] + if isinstance(work_id, int): + sheet = doc.get_worksheet(work_id) + elif isinstance(work_id, str): + sheet = doc.worksheet(work_id) + else: + raise Exception( + f"Could not identify a worksheet in the doc from identifier: {work_id}" + ) + else: + sheet = doc.sheet1 + + return pd.DataFrame(sheet.get_all_records()) diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py new file mode 100644 index 00000000..ffc5cb3c --- /dev/null +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -0,0 +1,26 @@ +from typing import Dict + +from pyiceberg import catalog + +from . import Plugin +from ..utils import SourceConfig + + +class IcebergPlugin(Plugin): + def __init__(self, config: Dict): + self._catalog = catalog.load_catalog(config.get("catalog")) + + def load(self, source_config: SourceConfig): + table_format = source_config.meta.get("iceberg_table", "{schema}.{identifier}") + table_name = table_format.format(**source_config.as_dict()) + table = self._catalog.load_table(table_name) + scan_keys = { + "row_filter", + "selected_fields", + "case_sensitive", + "snapshot_id", + "options", + "limit", + } + scan_config = {k: source_config.meta[k] for k in scan_keys if k in source_config.meta} + return table.scan(**scan_config).to_arrow() diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py new file mode 100644 index 00000000..e9b7b2e4 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -0,0 +1,29 @@ +from typing import Any +from typing import Dict + +import pandas as pd +from sqlalchemy import create_engine +from sqlalchemy import text + +from . import Plugin +from ..utils import SourceConfig + + +class SQLAlchemyPlugin(Plugin): + def __init__(self, plugin_config: Dict[str, Any]): + self.engine = create_engine(plugin_config["connection_url"]) + + def load(self, source_config: SourceConfig) -> pd.DataFrame: + if "query" in source_config.meta: + query = source_config.meta["query"] + query = query.format(**source_config.as_dict()) + params = source_config.meta.get("params", {}) + with self.engine.connect() as conn: + return pd.read_sql_query(text(query), con=conn, params=params) + else: + if "table" in source_config.meta: + table = source_config.meta["table"] + else: + table = source_config.table_name() + with self.engine.connect() as conn: + return pd.read_sql_table(table, con=conn) diff --git a/dbt/adapters/duckdb/relation.py b/dbt/adapters/duckdb/relation.py index 0aea4604..52265810 100644 --- a/dbt/adapters/duckdb/relation.py +++ b/dbt/adapters/duckdb/relation.py @@ -3,6 +3,8 @@ from typing import Optional from typing import Type +from .connections import DuckDBConnectionManager +from .utils import SourceConfig from dbt.adapters.base.relation import BaseRelation from dbt.adapters.base.relation import Self from dbt.contracts.graph.nodes import SourceDefinition @@ -10,44 +12,35 @@ @dataclass(frozen=True, eq=False, repr=False) class DuckDBRelation(BaseRelation): - external_location: Optional[str] = None + external: Optional[str] = None @classmethod def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self: - - # Some special handling here to allow sources that are external files to be specified - # via a `external_location` meta field. If the source's meta field is used, we include - # some logic to allow basic templating of the external location based on the individual - # name or identifier for the table itself to cut down on boilerplate. - ext_location = None - if "external_location" in source.meta: - ext_location = source.meta["external_location"] - elif "external_location" in source.source_meta: - # Use str.format here to allow for some basic templating outside of Jinja - ext_location = source.source_meta["external_location"] - - if ext_location: + source_config = SourceConfig.create(source) + # First check to see if a 'plugin' is defined in the meta argument for + # the source or its parent configuration, and if it is, use the environment + # associated with this run to get the name of the source that we should + # reference in the compiled model + if "plugin" in source_config.meta: + plugin_name = source_config.meta["plugin"] + DuckDBConnectionManager.env().load_source(plugin_name, source_config) + elif "external_location" in source_config.meta: # Call str.format with the schema, name and identifier for the source so that they # can be injected into the string; this helps reduce boilerplate when all # of the tables in the source have a similar location based on their name # and/or identifier. - format_args = { - "schema": source.schema, - "name": source.name, - "identifier": source.identifier, - } - if source.meta: - format_args.update(source.meta) - ext_location = ext_location.format(**format_args) + ext_location = source_config.meta["external_location"].format( + **source_config.as_dict() + ) # If it's a function call or already has single quotes, don't add them if "(" not in ext_location and not ext_location.startswith("'"): ext_location = f"'{ext_location}'" - kwargs["external_location"] = ext_location + kwargs["external"] = ext_location return super().create_from_source(source, **kwargs) # type: ignore def render(self) -> str: - if self.external_location: - return self.external_location + if self.external: + return self.external else: return super().render() diff --git a/dbt/adapters/duckdb/utils.py b/dbt/adapters/duckdb/utils.py new file mode 100644 index 00000000..c8e6d14d --- /dev/null +++ b/dbt/adapters/duckdb/utils.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass +from typing import Any +from typing import Dict +from typing import Optional + +from dbt.contracts.graph.nodes import SourceDefinition + + +@dataclass +class SourceConfig: + name: str + identifier: str + schema: str + database: Optional[str] + meta: Dict[str, Any] + + def table_name(self) -> str: + if self.database: + return ".".join([self.database, self.schema, self.identifier]) + else: + return ".".join([self.schema, self.identifier]) + + def as_dict(self) -> Dict[str, Any]: + base = { + "name": self.name, + "identifier": self.identifier, + "schema": self.schema, + "database": self.database, + } + base.update(self.meta) + return base + + @classmethod + def create(cls, source: SourceDefinition) -> "SourceConfig": + meta = source.source_meta.copy() + meta.update(source.meta) + return SourceConfig( + name=source.name, + identifier=source.identifier, + schema=source.schema, + database=source.database, + meta=meta, + ) diff --git a/dev-requirements.txt b/dev-requirements.txt index 99a2c611..992ac7ea 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,8 +15,10 @@ flake8 flaky freezegun==1.2.2 fsspec +gspread ipdb mypy==1.2.0 +openpyxl pip-tools pre-commit psycopg2-binary @@ -27,6 +29,7 @@ pytest-csv pytest-xdist pytest-mock pytz +sqlalchemy tox>=3.13 twine wheel diff --git a/tests/conftest.py b/tests/conftest.py index 60a57a9f..a870f471 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import os import subprocess import time @@ -65,3 +66,9 @@ def skip_by_profile_type(profile_type, request): for skip_profile_type in request.node.get_closest_marker("skip_profile").args: if skip_profile_type == profile_type: pytest.skip(f"skipped on '{profile_type}' profile") + + +@pytest.fixture(scope="session") +def test_data_path(): + test_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(test_dir, "data") \ No newline at end of file diff --git a/tests/data/excel_file.xlsx b/tests/data/excel_file.xlsx new file mode 100644 index 00000000..ffe12708 Binary files /dev/null and b/tests/data/excel_file.xlsx differ diff --git a/tests/functional/adapter/test_sources.py b/tests/functional/adapter/test_sources.py index 500c3e10..0c1f9448 100644 --- a/tests/functional/adapter/test_sources.py +++ b/tests/functional/adapter/test_sources.py @@ -1,7 +1,6 @@ import os import pytest -import yaml from dbt.tests.util import run_dbt sources_schema_yml = """version: 2 @@ -35,14 +34,6 @@ class TestExternalSources: - @pytest.fixture(scope="class", autouse=True) - def setEnvVars(self): - os.environ["DBT_TEST_SCHEMA_NAME_VARIABLE"] = "test_run_schema" - - yield - - del os.environ["DBT_TEST_SCHEMA_NAME_VARIABLE"] - @pytest.fixture(scope="class") def models(self): return { @@ -51,13 +42,6 @@ def models(self): "multi_source_model.sql": models_multi_source_model_sql, } - def run_dbt_with_vars(self, project, cmd, *args, **kwargs): - vars_dict = { - "test_run_schema": project.test_schema, - } - cmd.extend(["--vars", yaml.safe_dump(vars_dict)]) - return run_dbt(cmd, *args, **kwargs) - @pytest.fixture(scope="class") def seeds_source_file(self): with open("/tmp/seeds_source_something.csv", "w") as f: @@ -73,7 +57,7 @@ def ost_file(self): os.unlink("/tmp/seeds_other_source_table.csv") def test_external_sources(self, seeds_source_file, ost_file, project): - results = self.run_dbt_with_vars(project, ["run"]) + results = run_dbt(["run"]) assert len(results) == 2 - test_results = self.run_dbt_with_vars(project, ["test"]) + test_results = run_dbt(["test"]) assert len(test_results) == 2 diff --git a/tests/functional/plugins/test_gsheet.py b/tests/functional/plugins/test_gsheet.py new file mode 100644 index 00000000..a7a5ab11 --- /dev/null +++ b/tests/functional/plugins/test_gsheet.py @@ -0,0 +1,85 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: gsheet_source + schema: main + meta: + plugin: gsheet + title: "Josh's Test Spreadsheet" + tables: + - name: gsheet1 + description: "My first sheet" + - name: gsheet2 + description: "The second sheet in the doc" + meta: + worksheet: "TwoSheet" +""" + +models_source_model1_sql = """ + select * from {{ source('gsheet_source', 'gsheet1') }} +""" +models_source_model2_sql = """ + select * from {{ source('gsheet_source', 'gsheet2') }} +""" + + +# Skipping this b/c it requires using my (@jwills) personal creds +# when testing it locally and also b/c I think there is something +# wrong with profiles_config_update since it can't be used in multiple +# tests in the same pytest session +@pytest.mark.skip +class TestGSheetPlugin: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + config = {"method": "oauth"} + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [ + {"name": "gsheet", "impl": "gsheet", "config": config} + ], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, test_data_path): + return { + "schema.yml": sources_schema_yml.format(test_data_path=test_data_path), + "source_model1.sql": models_source_model1_sql, + "source_model2.sql": models_source_model2_sql, + } + + def test_gshseet_plugin(self, project): + results = run_dbt() + assert len(results) == 2 + + check_relations_equal( + project.adapter, + [ + "gsheet1", + "source_model1", + ], + ) + + check_relations_equal( + project.adapter, + [ + "gsheet2", + "source_model2", + ], + ) diff --git a/tests/functional/plugins/test_iceberg.py b/tests/functional/plugins/test_iceberg.py new file mode 100644 index 00000000..79a1c21c --- /dev/null +++ b/tests/functional/plugins/test_iceberg.py @@ -0,0 +1,71 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: iceberg_source + schema: main + meta: + plugin: icee + iceberg_table: "examples.{identifier}" + tables: + - name: nyc_taxi_locations +""" + +models_source_model1_sql = """ + select * from {{ source('iceberg_source', 'nyc_taxi_locations') }} +""" + + +# Skipping this b/c it requires using my (@jwills) personal creds +# when testing it locally and also b/c I think there is something +# wrong with profiles_config_update since it can't be used in multiple +# tests in the same pytest session +@pytest.mark.skip +class TestIcebergPlugin: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + config = {"catalog": "default"} + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [ + {"name": "icee", "impl": "iceberg", "config": config} + ], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": sources_schema_yml, + "source_model1.sql": models_source_model1_sql, + } + + def test_iceberg_plugin(self, project): + results = run_dbt() + assert len(results) == 1 + + res = project.run_sql("SELECT COUNT(1) FROM nyc_taxi_locations", fetch="one") + assert res[0] == 265 + + check_relations_equal( + project.adapter, + [ + "nyc_taxi_locations", + "source_model1", + ], + ) \ No newline at end of file diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py new file mode 100644 index 00000000..dbaca109 --- /dev/null +++ b/tests/functional/plugins/test_plugins.py @@ -0,0 +1,140 @@ +import os +import pytest +import sqlite3 + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +excel_schema_yml = """ +version: 2 +sources: + - name: excel_source + schema: main + meta: + plugin: excel + tables: + - name: excel_file + description: "An excel file" + meta: + external_location: "{test_data_path}/excel_file.xlsx" +""" + +sqlalchemy_schema_yml = """ +version: 2 +sources: + - name: sql_source + schema: main + meta: + plugin: sql + tables: + - name: tt1 + description: "My first SQLAlchemy table" + meta: + query: "SELECT * FROM {identifier} WHERE id=:id" + params: + id: 1 + - name: tt2 + meta: + table: "test_table2" +""" + + +excel1_sql = """ + select * from {{ source('excel_source', 'excel_file') }} +""" +sqlalchemy1_sql = """ + select * from {{ source('sql_source', 'tt1') }} +""" +sqlalchemy2_sql = """ + select * from {{ source('sql_source', 'tt2') }} +""" + + +@pytest.mark.skip_profile("buenavista") +class TestPlugins: + @pytest.fixture(scope="class") + def sqlite_test_db(self): + path = "/tmp/satest.db" + db = sqlite3.connect(path) + cursor = db.cursor() + cursor.execute("CREATE TABLE tt1 (id int, name text)") + cursor.execute("INSERT INTO tt1 VALUES (1, 'John Doe')") + cursor.execute("INSERT INTO tt1 VALUES (2, 'Jane Smith')") + cursor.execute("CREATE TABLE test_table2 (a int, b int, c int)") + cursor.execute("INSERT INTO test_table2 VALUES (1, 2, 3), (4, 5, 6)") + cursor.close() + db.commit() + db.close() + yield path + os.unlink(path) + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, sqlite_test_db): + if "path" not in dbt_profile_target: + return {} + + config = {"connection_url": f"sqlite:///{sqlite_test_db}"} + plugins = [ + {"name": "excel", "impl": "excel"}, + {"name": "sql", "impl": "sqlalchemy", "config": config}, + ] + + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": plugins, + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, test_data_path): + return { + "schema_excel.yml": excel_schema_yml.format(test_data_path=test_data_path), + "schema_sqlalchemy.yml": sqlalchemy_schema_yml, + "excel.sql": excel1_sql, + "sqlalchemy1.sql": sqlalchemy1_sql, + "sqlalchemy2.sql": sqlalchemy2_sql, + } + + def test_plugins(self, project): + results = run_dbt() + assert len(results) == 3 + + res = project.run_sql("SELECT COUNT(1) FROM excel_file", fetch="one") + assert res[0] == 9 + + check_relations_equal( + project.adapter, + [ + "excel_file", + "excel", + ], + ) + + res = project.run_sql("SELECT COUNT(1) FROM tt1", fetch="one") + assert res[0] == 1 + check_relations_equal( + project.adapter, + [ + "tt1", + "sqlalchemy1", + ], + ) + + res = project.run_sql("SELECT COUNT(1) FROM tt2", fetch="one") + assert res[0] == 2 + check_relations_equal( + project.adapter, + [ + "tt2", + "sqlalchemy2", + ], + ) diff --git a/tox.ini b/tox.ini index 58c0a094..bcd4314a 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ deps = [testenv:{functional,py37,py38,py39,py310,py311,py}] -description = adapter plugin functional testing +description = adapter functional testing skip_install = True passenv = * commands = {envpython} -m pytest {posargs} tests/functional/adapter @@ -22,7 +22,7 @@ deps = -e. [testenv:{filebased,py37,py38,py39,py310,py311,py}] -description = adapter plugin functional testing using file-based DBs +description = adapter functional testing using file-based DBs skip_install = True passenv = * commands = {envpython} -m pytest --profile=file {posargs} tests/functional/adapter @@ -31,10 +31,20 @@ deps = -e. [testenv:{fsspec,py37,py38,py39,py310,py311,py}] -description = adapter plugin fsspec testing +description = adapter fsspec testing skip_install = True passenv = * commands = {envpython} -m pytest {posargs} tests/functional/fsspec deps = -rdev-requirements.txt -e. + +[testenv:{plugins,py37,py38,py39,py310,py311,py}] +description = adapter plugin testing +skip_install = True +passenv = * +commands = {envpython} -m pytest {posargs} tests/functional/plugins +deps = + -rdev-requirements.txt + pyiceberg + -e.