Skip to content

Commit

Permalink
add join table and update view tests for filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 30, 2024
1 parent bef50d7 commit b13e492
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 16 deletions.
3 changes: 3 additions & 0 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def create_view_for_tables(self, tables: List[str]) -> None:
for table_name in tables:
if table_name in self._existing_views:
continue
if table_name not in self.fs_client.schema.tables:
# unknown tables will not be created
continue
self._existing_views.append(table_name)

folder = self.fs_client.get_table_dir(table_name)
Expand Down
2 changes: 1 addition & 1 deletion dlt/helpers/streamlit_app/pages/load_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def write_load_status_page(pipeline: Pipeline) -> None:
)

if loads_df is not None:
selected_load_id = st.selectbox("Select load id", loads_df)
selected_load_id: str = st.selectbox("Select load id", loads_df)
schema = pipeline.default_schema

st.markdown("**Number of loaded rows:**")
Expand Down
75 changes: 60 additions & 15 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
GCS_BUCKET,
SFTP_BUCKET,
)
from pandas import DataFrame
from dlt.destinations import filesystem
from dlt.destinations.impl.filesystem.sql_client import FilesystemSqlClient


def _run_dataset_checks(
Expand Down Expand Up @@ -63,7 +63,22 @@ def items():
for i in range(total_records)
]

return [items]
@dlt.resource(
columns={
"id": {"data_type": "bigint"},
"double_id": {"data_type": "bigint"},
}
)
def double_items():
yield from [
{
"id": i,
"double_id": i * 2,
}
for i in range(total_records)
]

return [items, double_items]

# run source
s = source()
Expand Down Expand Up @@ -142,19 +157,6 @@ def items():
ids = reduce(lambda a, b: a + b, [[item[0] for item in chunk] for chunk in chunks])
assert set(ids) == set(range(total_records))

# simple check that query also works
tname = pipeline.sql_client().make_qualified_table_name("items")
query_relationship = pipeline._dataset()(f"select * from {tname} where id < 20")

# we selected the first 20
table = query_relationship.arrow()
assert table.num_rows == 20

# check unqualified table name
query_relationship = pipeline._dataset()("select * from items where id < 20")
table = query_relationship.arrow()
assert table.num_rows == 20

# check that hints are carried over to arrow table
expected_decimal_precision = 10
expected_decimal_precision_2 = 12
Expand All @@ -171,6 +173,49 @@ def items():
== expected_decimal_precision_2
)

# simple check that query also works
tname = pipeline.sql_client().make_qualified_table_name("items")
query_relationship = pipeline._dataset()(f"select * from {tname} where id < 20")

# we selected the first 20
table = query_relationship.arrow()
assert table.num_rows == 20

# check join query
tdname = pipeline.sql_client().make_qualified_table_name("double_items")
query = (
f"SELECT i.id, di.double_id FROM {tname} as i JOIN {tdname} as di ON (i.id = di.id) WHERE"
" i.id < 20 ORDER BY i.id ASC"
)
join_relationship = pipeline._dataset()(query)
table = join_relationship.fetchall()
assert len(table) == 20
assert table[0] == (0, 0)
assert table[5] == (5, 10)
assert table[10] == (10, 20)

# special filesystem sql checks
if destination_config.destination_type == "filesystem":
# check we can create new tables from the views
with pipeline.sql_client() as c:
c.create_view_for_tables(["items", "double_items"])
c.execute_sql(
"CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN"
" double_items as di ON (i.id = di.id));"
)
with c.execute_query("SELECT * FROM items_joined ORDER BY id ASC;") as cursor:
joined_table = cursor.fetchall()
assert len(joined_table) == total_records
assert joined_table[0] == (0, 0)
assert joined_table[5] == (5, 10)
assert joined_table[10] == (10, 20)

# inserting values into a view should fail gracefully
try:
c.execute_sql("INSERT INTO double_items VALUES (1, 2)")
except Exception as exc:
assert "double_items is not an table" in str(exc)


@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down

0 comments on commit b13e492

Please sign in to comment.