Skip to content

Commit

Permalink
logs warning if deduplication state is large (#1877)
Browse files Browse the repository at this point in the history
* logs warning if deduplication state is large

* tests for ALL_TEST_DATA_ITEM_FORMATS

* improves error message, refactors magic number

* Make threshold configurable, display the duplication warning only once, update the warning message, change the test to check for single warning

* Move `duplicate_cursor_warning_threshold` to a ClassVar

---------

Co-authored-by: Anton Burnashev <anton.burnashev@gmail.com>
  • Loading branch information
willi-mueller and burnash authored Nov 13, 2024
1 parent 1346b3b commit 592c797
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
17 changes: 17 additions & 0 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"
lag: Optional[float] = None
duplicate_cursor_warning_threshold: ClassVar[int] = 200

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand Down Expand Up @@ -529,12 +530,28 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
transformer.compute_unique_value(row, self.primary_key)
for row in transformer.last_rows
)
initial_hash_count = len(self._cached_state.get("unique_hashes", []))
# add directly computed hashes
unique_hashes.update(transformer.unique_hashes)
self._cached_state["unique_hashes"] = list(unique_hashes)
final_hash_count = len(self._cached_state["unique_hashes"])

self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count)
return rows

def _check_duplicate_cursor_threshold(
self, initial_hash_count: int, final_hash_count: int
) -> None:
if initial_hash_count <= Incremental.duplicate_cursor_warning_threshold < final_hash_count:
logger.warning(
f"Large number of records ({final_hash_count}) sharing the same value of "
f"cursor field '{self.cursor_path}'. This can happen if the cursor "
"field has a low resolution (e.g., only stores dates without times), "
"causing many records to share the same cursor value. "
"Consider using a cursor column with higher resolution to reduce "
"the deduplication state size."
)


Incremental.EMPTY = Incremental[Any]()
Incremental.EMPTY.__is_resolved__ = True
Expand Down
29 changes: 29 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3539,3 +3539,32 @@ def test_apply_lag() -> None:
assert apply_lag(2, 0, 1, max) == 0
assert apply_lag(1, 2, 1, min) == 2
assert apply_lag(2, 2, 1, min) == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
@pytest.mark.parametrize("primary_key", ["id", None])
def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primary_key, mocker):
@dlt.resource(primary_key=primary_key)
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
# Cross the default threshold of 200
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201)],
)
# Second batch adds more items but shouldn't trigger warning
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201, 301)],
)

logger_spy = mocker.spy(dlt.common.logger, "warning")
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data(1))

# Verify warning was called exactly once
warning_calls = [
call for call in logger_spy.call_args_list if "Large number of records" in call.args[0]
]
assert len(warning_calls) == 1

0 comments on commit 592c797

Please sign in to comment.