diff --git a/dlt/destinations/impl/filesystem/sql_client.py b/dlt/destinations/impl/filesystem/sql_client.py index 2f32e19b6f..525d8be9e8 100644 --- a/dlt/destinations/impl/filesystem/sql_client.py +++ b/dlt/destinations/impl/filesystem/sql_client.py @@ -146,6 +146,9 @@ def create_view_for_tables(self, tables: List[str]) -> None: for table_name in tables: if table_name in self._existing_views: continue + if table_name not in self.fs_client.schema.tables: + # unknown tables will not be created + continue self._existing_views.append(table_name) folder = self.fs_client.get_table_dir(table_name) diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py index ee13cf2531..699e786410 100644 --- a/dlt/helpers/streamlit_app/pages/load_info.py +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -27,7 +27,7 @@ def write_load_status_page(pipeline: Pipeline) -> None: ) if loads_df is not None: - selected_load_id = st.selectbox("Select load id", loads_df) + selected_load_id: str = st.selectbox("Select load id", loads_df) schema = pipeline.default_schema st.markdown("**Number of loaded rows:**") diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index d9a25382e6..0f6c1899db 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -14,8 +14,8 @@ GCS_BUCKET, SFTP_BUCKET, ) -from pandas import DataFrame from dlt.destinations import filesystem +from dlt.destinations.impl.filesystem.sql_client import FilesystemSqlClient def _run_dataset_checks( @@ -63,7 +63,22 @@ def items(): for i in range(total_records) ] - return [items] + @dlt.resource( + columns={ + "id": {"data_type": "bigint"}, + "double_id": {"data_type": "bigint"}, + } + ) + def double_items(): + yield from [ + { + "id": i, + "double_id": i * 2, + } + for i in range(total_records) + ] + + return [items, double_items] # run source s = source() @@ -142,19 +157,6 @@ def items(): ids = reduce(lambda a, b: a + b, [[item[0] for item in chunk] for chunk in chunks]) assert set(ids) == set(range(total_records)) - # simple check that query also works - tname = pipeline.sql_client().make_qualified_table_name("items") - query_relationship = pipeline._dataset()(f"select * from {tname} where id < 20") - - # we selected the first 20 - table = query_relationship.arrow() - assert table.num_rows == 20 - - # check unqualified table name - query_relationship = pipeline._dataset()("select * from items where id < 20") - table = query_relationship.arrow() - assert table.num_rows == 20 - # check that hints are carried over to arrow table expected_decimal_precision = 10 expected_decimal_precision_2 = 12 @@ -171,6 +173,49 @@ def items(): == expected_decimal_precision_2 ) + # simple check that query also works + tname = pipeline.sql_client().make_qualified_table_name("items") + query_relationship = pipeline._dataset()(f"select * from {tname} where id < 20") + + # we selected the first 20 + table = query_relationship.arrow() + assert table.num_rows == 20 + + # check join query + tdname = pipeline.sql_client().make_qualified_table_name("double_items") + query = ( + f"SELECT i.id, di.double_id FROM {tname} as i JOIN {tdname} as di ON (i.id = di.id) WHERE" + " i.id < 20 ORDER BY i.id ASC" + ) + join_relationship = pipeline._dataset()(query) + table = join_relationship.fetchall() + assert len(table) == 20 + assert table[0] == (0, 0) + assert table[5] == (5, 10) + assert table[10] == (10, 20) + + # special filesystem sql checks + if destination_config.destination_type == "filesystem": + # check we can create new tables from the views + with pipeline.sql_client() as c: + c.create_view_for_tables(["items", "double_items"]) + c.execute_sql( + "CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN" + " double_items as di ON (i.id = di.id));" + ) + with c.execute_query("SELECT * FROM items_joined ORDER BY id ASC;") as cursor: + joined_table = cursor.fetchall() + assert len(joined_table) == total_records + assert joined_table[0] == (0, 0) + assert joined_table[5] == (5, 10) + assert joined_table[10] == (10, 20) + + # inserting values into a view should fail gracefully + try: + c.execute_sql("INSERT INTO double_items VALUES (1, 2)") + except Exception as exc: + assert "double_items is not an table" in str(exc) + @pytest.mark.essential @pytest.mark.parametrize(