Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Nov 21, 2024
1 parent a94c115 commit 867ee67
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
11 changes: 7 additions & 4 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ def merge_columns(
if columns_partial is False:
raise NotImplementedError("columns_partial must be False for merge_columns")


# remove incomplete columns in table that are complete in diff table
for col_name, column_b in columns_b.items():
column_a = columns_a.get(col_name)
Expand Down Expand Up @@ -549,11 +548,15 @@ def merge_diff(table: TTableSchema, table_diff: TPartialTableSchema) -> TPartial
* nothing gets deleted
"""

incremental_a_col = get_first_column_name_with_prop(table, 'incremental', include_incomplete=True)
incremental_a_col = get_first_column_name_with_prop(
table, "incremental", include_incomplete=True
)
if incremental_a_col:
incremental_b_col = get_first_column_name_with_prop(table_diff, 'incremental', include_incomplete=True)
incremental_b_col = get_first_column_name_with_prop(
table_diff, "incremental", include_incomplete=True
)
if incremental_b_col:
table['columns'][incremental_a_col].pop('incremental')
table["columns"][incremental_a_col].pop("incremental")

# add new columns when all checks passed
updated_columns = merge_columns(table["columns"], table_diff["columns"])
Expand Down
1 change: 0 additions & 1 deletion dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,4 +635,3 @@ def __getattribute__(self, name: str) -> Any:
raise RuntimeError("This instance has been dropped and cannot be used anymore.")

return DefunctClass

2 changes: 1 addition & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
hint = hint.incremental
# sometimes internal incremental is not bound
if hint:
hints[name] = hint.to_table_hint()
hints[name] = hint.to_table_hint() # type: ignore[attr-defined]
continue
if name == "original_columns":
# this is original type of the columns ie. Pydantic model
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab
if isinstance(incremental, IncrementalResourceWrapper):
incremental = incremental.incremental
if incremental:
self._hints['incremental'] = incremental
self._hints["incremental"] = incremental

table_schema = super().compute_table_schema(item, meta)

Expand Down
18 changes: 9 additions & 9 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3766,18 +3766,18 @@ def test_incremental_table_hint_merged_columns(use_dict: bool) -> None:
)
)
def some_data():
yield [{"col_a": i, "foo": i+ 2, "col_b": i + 1, "bar": i+3} for i in range(10)]
yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)]

pipeline = dlt.pipeline(pipeline_name=uniq_id())
pipeline.extract(some_data())

table_schema = pipeline.default_schema.tables["some_data"]
assert table_schema['columns']['col_a']['incremental'] == {
assert table_schema["columns"]["col_a"]["incremental"] == {
"allow_external_schedulers": False,
"cursor_path": "col_a",
"initial_value": 3,
"last_value_func": "min",
"on_cursor_value_missing": "raise"
"on_cursor_value_missing": "raise",
}

rs = some_data()
Expand All @@ -3792,13 +3792,13 @@ def some_data():
table_schema_2 = pipeline.default_schema.tables["some_data"]

# Only one column should have the hint
assert 'incremental' not in table_schema_2['columns']['col_a']
assert table_schema_2['columns']['col_b']['incremental'] == {
assert "incremental" not in table_schema_2["columns"]["col_a"]
assert table_schema_2["columns"]["col_b"]["incremental"] == {
"allow_external_schedulers": False,
"cursor_path": "col_b",
"initial_value": 5,
"last_value_func": "max",
"on_cursor_value_missing": "raise"
"on_cursor_value_missing": "raise",
}


Expand All @@ -3810,13 +3810,13 @@ def test_incremental_column_hint_cursor_is_not_column(use_dict: bool):
)
)
def some_data():
yield [{"col_a": i, "foo": i+ 2, "col_b": i + 1, "bar": i+3} for i in range(10)]
yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)]

pipeline = dlt.pipeline(pipeline_name=uniq_id())

pipeline.extract(some_data())

table_schema = pipeline.default_schema.tables["some_data"]

for col in table_schema['columns'].values():
assert 'incremental' not in col
for col in table_schema["columns"].values():
assert "incremental" not in col

0 comments on commit 867ee67

Please sign in to comment.