Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add incremental lag for datetime, int, and float cursors #1957

Merged
merged 20 commits into from
Oct 29, 2024

Conversation

donotpush
Copy link
Collaborator

Copy link

netlify bot commented Oct 15, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 401587d
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/6720f76d921cf100087f8c3e
😎 Deploy Preview https://deploy-preview-1957--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donotpush the way you solve it is good! if you apply lag to last_value prop all the code should work.

there are a few things that - mostly a result of not perfect code - that you need to pay attention:

  • def parse_native_representation(self, native_value: Any) -> None: - you need to transfer lag manually
  • make sure merge works (lag is copied via universal mechanism)
  • we have ensure_pendulum_datetime that converts strings into datetimes (also int and floats) - maybe it is better to use that. you should also convert the result back to string in the same representation. or the comparison will stop working. tldr;> _apply_lag must return exactly the same type as in input
  • we should also support date. then lag is in days

one more thing which IMO will be super helpful: allow to define lag in rest_api toolkint:

    start_param: str
    end_param: Optional[str]

you can add lag here and hopefully it will be passed to incremental

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see my comments. on top of that:

  1. I'll ask @burnash to help you with adding lag to rest api. Or maybe we'll propose a commit to speed it up. just fix the tests we have so we know all works
  2. I'm not sure you test the lag on string and date fields? those should be quite simple tests ie. for append only resources that detect expected duplicates due to lag. see my comments

dlt/common/time.py Show resolved Hide resolved
name = "events"

@dlt.resource(name=name, primary_key="id")
def r1(_=dlt.sources.incremental("created_at")):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the concept of test is good. but IMO you should create just one resource with merge write disposition that you call with the lag you want from the very beginning. this is how people will use it IMO.

emulate returning "updated_events" on the second call to it ie. via some kind of nonlocal flag that tells to add those events on the second time.

the results of the test look good but please test two additional things

  • that you do not apply lag to "initial_value" (set it in incremental)
  • there's IMO issue with internal deduplication. please test for updated_events that update id=3:
{
                "id": 3,
                "created_at": "2023-03-03T02:00:01Z",
                "event": "updated",
            }
IMO it will not be included into final result because we'll deduplicate it. you should IMO disable deduplication when lag is defined
```py
@property
    def deduplication_disabled(self) -> bool:
        """Skip deduplication when length of the key is 0"""
        return isinstance(self.primary_key, (list, tuple)) and len(self.primary_key) == 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, there is a deduplication bug - I have captured it in the tests but surprisingly setting deduplication_disabled to True doesn't solve issue. Any tips on how to solve it?


I have refactored the code and tests with your suggestions.


Question, what do you mean with the following point?

  • that you do not apply lag to "initial_value" (set it in incremental)

In the currently implementation lag only applies to last_value but I see that initially last_value is set to initial_value on get_state.

Do I need to ignore the _apply_lag exec when last_value == initial_value?

    def last_value(self) -> Optional[TCursorValue]:
        s = self.get_state()

        if self.lag is not None:
            return self._apply_lag(s["last_value"])

        return s["last_value"]  # type: ignore

name = "items"

@dlt.resource(name=name, primary_key="id")
def r1(_=dlt.sources.incremental("id")):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have similar comments as to the test below:

  1. use just one resource with "append" write disposition. you can return the same response each time
  2. call the resource several time with different lags (use apply_hints to replace incremental)
  3. test if expected elements got duplicated
  4. IMO you'll have a deduplication bug that I describe above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented your comments except for number 2:

  • call the resource several time with different lags (use apply_hints to replace incremental)

I have used @pytest.mark.parametrize("lag", [-1, 10]) to execute the code with different lags instead of using apply_hints.

Also facing the deduplication bug - more info above in the other comments.

@donotpush
Copy link
Collaborator Author

donotpush commented Oct 22, 2024

@donotpush the way you solve it is good! if you apply lag to last_value prop all the code should work.

there are a few things that - mostly a result of not perfect code - that you need to pay attention:

  • def parse_native_representation(self, native_value: Any) -> None: - you need to transfer lag manually
  • make sure merge works (lag is copied via universal mechanism)
  • we have ensure_pendulum_datetime that converts strings into datetimes (also int and floats) - maybe it is better to use that. you should also convert the result back to string in the same representation. or the comparison will stop working. tldr;> _apply_lag must return exactly the same type as in input
  • we should also support date. then lag is in days

one more thing which IMO will be super helpful: allow to define lag in rest_api toolkint:

    start_param: str
    end_param: Optional[str]

you can add lag here and hopefully it will be passed to incremental

I’ve added lag in parse_native_representation, added a simple unit test for merge too.

The rest of the implementation is complete, except for the rest_api.

@donotpush donotpush requested a review from rudolfix October 22, 2024 15:51
@donotpush
Copy link
Collaborator Author

I have introduced a bug in IncrementalTransform in my last commit - fixing it.

@donotpush
Copy link
Collaborator Author

donotpush commented Oct 23, 2024

Fixed the tests:

  • test_pipeline_resource_incremental_datetime_lag
  • test_pipeline_resource_incremental_int_lag

Please check them carefully because I didn't really manage at anytime to reproduce deduplication bug that you mentioned - I was wrong in my previous comments.

Even without modifying the deduplication_disabled code my tests pass which it is an indication to revert my code changes in deduplication_disabled:

Screenshot 2024-10-23 at 15 50 43

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

date format detection function is really cool! I think it will be useful in other places. we still need more tests. I expect this feature to be frequently used and all edge cases will be quickly explored. missing tests:

  • can we test how min last value function behaves?
  • please make sure lag does not impact initial values
  • please test if lag is disabled for custom value functions
  • can we test lag on a date and float?

also please move incremental tests to tests/extract/incremental. it is sufficient to just run them on duckdb which is available there

dlt/common/time.py Show resolved Hide resolved
dlt/extract/incremental/__init__.py Outdated Show resolved Hide resolved
tests/load/pipeline/test_pipelines.py Outdated Show resolved Hide resolved
tests/load/pipeline/test_pipelines.py Outdated Show resolved Hide resolved
@donotpush
Copy link
Collaborator Author

@rudolfix implemented all the requested changes.

Please review carefully, specially the end cases.

@donotpush donotpush requested a review from rudolfix October 28, 2024 23:56
@donotpush donotpush marked this pull request as ready for review October 29, 2024 00:12
rudolfix
rudolfix previously approved these changes Oct 29, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
we still needs docs before we merge

I did a few fixes please check it out @donotpush

  • I moved lag functions to a separate module. makes incremental code simpler. allows to write unit tests for apply_lag for various edge cases (also done)
  • if self.end_value: - this also includes "0" which is a valid end value. @donotpush it is OK to use those implicit casts to bool but you must always make sure this is exactly what you want. lag==0 is OK to skip. end_value==0 is a valid value and ie. we should skip lag also for that (it is tested)
  • I refactored the code that makes sure that lagged values are within the range of initial_value by just using last_value_func.
  • type: ignore - always give a reason in brackets
  • added lag as field to dataclass, not via property
  • extended TypedDict to support lag in rest-api

we'll merge this once docs are ready

@rudolfix rudolfix merged commit 3fc9fe4 into devel Oct 29, 2024
59 of 61 checks passed
@rudolfix rudolfix deleted the feat/970-add-incremental-lag branch October 29, 2024 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add lag / attribution window to incremental
2 participants