Skip to content

Commit

Permalink
improves normalization tests, improves docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 11, 2024
1 parent 46cdd21 commit dd11f6b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
12 changes: 9 additions & 3 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ def should_normalize_arrow_schema(
naming: NamingConvention,
add_load_id: bool = False,
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool, TTableSchemaColumns]:
"""Figure out if any of the normalization steps must be executed. This prevents
from rewriting arrow tables when no changes are needed. Refer to `normalize_py_arrow_item`
for a list of normalizations. Note that `column` must be already normalized.
"""
rename_mapping = get_normalized_arrow_fields_mapping(schema, naming)
# no clashes in rename ensured above
rev_mapping = {v: k for k, v in rename_mapping.items()}
nullable_mapping = {k: is_nullable_column(v) for k, v in columns.items()}
# All fields from arrow schema that have nullable set to different value than in columns
Expand Down Expand Up @@ -301,7 +306,8 @@ def normalize_py_arrow_item(
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None,
) -> TAnyArrowItem:
"""Normalize arrow `item` schema according to the `columns`.
"""Normalize arrow `item` schema according to the `columns`. Note that
columns must be already normalized.
1. arrow schema field names will be normalized according to `naming`
2. arrows columns will be reordered according to `columns`
Expand Down Expand Up @@ -366,14 +372,14 @@ def normalize_py_arrow_item(

def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema, naming: NamingConvention) -> StrStr:
"""Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions"""
# assume that columns in the arrow table may be already normalized so use "normalize_path"
# use normalize_path to be compatible with how regular columns are normalized in dlt.Schema
norm_f = naming.normalize_path
name_mapping = {n.name: norm_f(n.name) for n in schema}
# verify if names uniquely normalize
normalized_names = set(name_mapping.values())
if len(name_mapping) != len(normalized_names):
raise NameNormalizationCollision(
f"Arrow schema fields normalized from {list(name_mapping.keys())} to"
f"Arrow schema fields normalized from:\n{list(name_mapping.keys())}:\nto:\n"
f" {list(normalized_names)}"
)
return name_mapping
Expand Down
13 changes: 13 additions & 0 deletions tests/pipeline/test_pipeline_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,18 @@ def test_parquet_with_flattened_columns() -> None:

# make sure flattened columns exist
assert "issue__reactions__url" in pipeline.default_schema.tables["events"]["columns"]
assert "issue_reactions_url" not in pipeline.default_schema.tables["events"]["columns"]

events_table = pipeline._dataset().events.arrow()
assert "issue__reactions__url" in events_table.schema.names
assert "issue_reactions_url" not in events_table.schema.names

# load table back into filesystem
info = pipeline.run(events_table, table_name="events2", loader_file_format="parquet")
assert_load_info(info)

assert "issue__reactions__url" in pipeline.default_schema.tables["events2"]["columns"]
assert "issue_reactions_url" not in pipeline.default_schema.tables["events2"]["columns"]

# load back into original table
info = pipeline.run(events_table, table_name="events", loader_file_format="parquet")
Expand All @@ -538,6 +541,16 @@ def test_parquet_with_flattened_columns() -> None:
# double row count
assert events_table.num_rows * 2 == events_table_new.num_rows

# now add a column that clearly needs normalization
updated_events_table = events_table_new.append_column(
"Clearly!Normalize", events_table_new["issue__reactions__url"]
)
info = pipeline.run(updated_events_table, table_name="events", loader_file_format="parquet")
assert_load_info(info)

assert "clearly_normalize" in pipeline.default_schema.tables["events"]["columns"]
assert "Clearly!Normalize" not in pipeline.default_schema.tables["events"]["columns"]


def test_resource_file_format() -> None:
os.environ["RESTORE_FROM_DESTINATION"] = "False"
Expand Down

0 comments on commit dd11f6b

Please sign in to comment.