Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add support for dbt sources external location defined in meta #2999

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sqlmesh/dbt/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ def source(package: str, name: str) -> t.Optional[BaseRelation]:
if relation_info is None:
logger.debug("Could not resolve source package='%s' name='%s'", package, name)
return None

return _relation_info_to_relation(relation_info, api.Relation, api.quote_policy)

return source
Expand Down
9 changes: 9 additions & 0 deletions sqlmesh/dbt/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SourceConfig(GeneralConfig):
loaded_at_field: t.Optional[str] = None
quoting: t.Dict[str, t.Optional[bool]] = {}
external: t.Optional[t.Dict[str, t.Any]] = {}
source_meta: t.Optional[t.Dict[str, t.Any]] = {}
erindru marked this conversation as resolved.
Show resolved Hide resolved
columns: t.Dict[str, ColumnConfig] = {}

_canonical_name: t.Optional[str] = None
Expand Down Expand Up @@ -86,12 +87,20 @@ def canonical_name(self, context: DbtContext) -> str:

@property
def relation_info(self) -> AttributeDict:
extras = {}
external_location = (
self.source_meta.get("external_location", None) if self.source_meta else None
)
if external_location:
extras["external"] = external_location.replace("{name}", self.table_name)

return AttributeDict(
{
"database": self.database,
"schema": self.schema_,
"identifier": self.table_name,
"type": RelationType.External.value,
"quote_policy": AttributeDict(self.quoting),
**extras,
}
)
9 changes: 8 additions & 1 deletion tests/dbt/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,11 @@ def test_variables(assert_exp_eq, sushi_test_project):
def test_source_config(sushi_test_project: Project):
source_configs = sushi_test_project.packages["sushi"].sources
assert set(source_configs) == {
"streaming.items",
"streaming.orders",
"parquet_file.items",
"streaming.order_items",
"streaming.items",
"parquet_file.orders",
}

expected_config = {
Expand All @@ -359,6 +361,11 @@ def test_source_config(sushi_test_project: Project):
== "raw.order_items"
)

assert (
source_configs["parquet_file.orders"].canonical_name(sushi_test_project.context)
== "read_parquet('path/to/external/orders.parquet')"
)


def test_seed_config(sushi_test_project: Project, mocker: MockerFixture):
seed_configs = sushi_test_project.packages["sushi"].seeds
Expand Down
34 changes: 34 additions & 0 deletions tests/dbt/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.manifest import ManifestHelper
from sqlmesh.dbt.profile import Profile
from sqlmesh.dbt.builtin import Api, _relation_info_to_relation
from sqlmesh.utils.jinja import MacroReference

pytestmark = pytest.mark.dbt
Expand Down Expand Up @@ -151,3 +152,36 @@ def test_variable_override():
variable_overrides={"top_waiters:limit": 1, "start": "2020-01-01"},
)
assert helper.models()["top_waiters"].limit_value == 1


@pytest.mark.xdist_group("dbt_manifest")
def test_source_meta_external_location():
project_path = Path("tests/fixtures/dbt/sushi_test")
profile = Profile.load(DbtContext(project_path))

helper = ManifestHelper(
project_path,
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
)

sources = helper.sources()
parquet_orders = sources["parquet_file.orders"]
assert parquet_orders.source_meta == {
"external_location": "read_parquet('path/to/external/{name}.parquet')"
}
assert (
parquet_orders.relation_info.external == "read_parquet('path/to/external/orders.parquet')"
)

api = Api("duckdb")
relation_info = sources["parquet_file.items"].relation_info
assert relation_info.external == "read_parquet('path/to/external/items.parquet')"

relation = _relation_info_to_relation(
sources["parquet_file.items"].relation_info, api.Relation, api.quote_policy
)
assert relation.identifier == "items"
assert relation.render() == "read_parquet('path/to/external/items.parquet')"
7 changes: 7 additions & 0 deletions tests/fixtures/dbt/sushi_test/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ sources:
- name: items
- name: orders
- name: order_items

- name: parquet_file
meta:
external_location: "read_parquet('path/to/external/{name}.parquet')"
tables:
- name: items
- name: orders