Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add incremental lag for datetime, int, and float cursors #1957

Merged
merged 20 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import datetime # noqa: I251
import re
from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa

from pendulum.parsing import (
Expand Down Expand Up @@ -154,6 +155,29 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.")


def detect_datetime_format(value: str) -> Optional[str]:
format_patterns = {
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"): "%Y-%m-%dT%H:%M:%SZ", # UTC 'Z'
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Timezone offset
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$"): "%Y-%m-%dT%H:%M:%S", # No timezone
re.compile(r"^\d{4}-\d{2}-\d{2}$"): "%Y-%m-%d", # Date only
re.compile(r"^\d{4}-\d{2}$"): "%Y-%m", # Year and month
re.compile(r"^\d{4}-W\d{2}$"): "%Y-W%W", # Week-based date
re.compile(r"^\d{4}-\d{3}$"): "%Y-%j", # Ordinal date
re.compile(r"^\d{4}$"): "%Y", # Year only
}

# Match against each compiled regular expression
for pattern, format_str in format_patterns.items():
if pattern.match(value):
return format_str

# Return None if no pattern matches
return None


def to_py_datetime(value: datetime.datetime) -> datetime.datetime:
"""Convert a pendulum.DateTime to a py datetime object.

Expand Down
54 changes: 53 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime # noqa: I251
from datetime import datetime, timedelta, date # noqa: I251
from typing import Generic, ClassVar, Any, Optional, Type, Dict
import dateutil.parser
from typing_extensions import get_origin, get_args

import inspect
Expand All @@ -9,6 +10,7 @@
from dlt.common import logger
from dlt.common.exceptions import MissingDependencyException
from dlt.common.pendulum import pendulum
from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format
from dlt.common.jsonpath import compile_path
from dlt.common.typing import (
TDataItem,
Expand Down Expand Up @@ -101,6 +103,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
The values passed explicitly to Incremental will be ignored.
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude
lag: Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func.
"""

# this is config/dataclass so declare members
Expand All @@ -111,6 +114,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
row_order: Optional[TSortOrder] = None
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"
lag: Optional[float] = None

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand All @@ -126,6 +130,7 @@ def __init__(
row_order: Optional[TSortOrder] = None,
allow_external_schedulers: bool = False,
on_cursor_value_missing: OnCursorValueMissing = "raise",
lag: Optional[float] = None,
) -> None:
# make sure that path is valid
if cursor_path:
Expand All @@ -149,6 +154,8 @@ def __init__(

self._cached_state: IncrementalColumnState = None
"""State dictionary cached on first access"""

self.lag = lag
super().__init__(lambda x: x) # TODO:

self.end_out_of_range: bool = False
Expand Down Expand Up @@ -289,6 +296,47 @@ def parse_native_representation(self, native_value: Any) -> None:
# Passing bare value `incremental=44` gets parsed as initial_value
self.initial_value = native_value

def _apply_lag(self, value: TCursorValue) -> TCursorValue:
if self.lag is None:
return value

# Determine if the input is originally a string and capture its format
is_str = isinstance(value, str)
original_format = None
if is_str:
original_format = detect_datetime_format(value)
value = ensure_pendulum_datetime(value) # type: ignore

# Apply lag based on the type of value
if isinstance(value, (datetime, date)):
delta = (
timedelta(seconds=self.lag)
if isinstance(value, datetime)
else timedelta(days=self.lag)
)
value = value - delta if self.last_value_func is max else value + delta # type: ignore

# If originally a string, convert back to the original format
if is_str and original_format:
value = value.strftime(original_format) # type: ignore

elif isinstance(value, int):
# Ensure that int types remain integers
adjusted_value = value - self.lag if self.last_value_func is max else value + self.lag
value = int(adjusted_value) # type: ignore

elif isinstance(value, float):
value = value - self.lag if self.last_value_func is max else value + self.lag # type: ignore

else:
# Handle unsupported types
logger.warning(
f"Lag is not supported for last_value_func: {self.last_value_func} and cursor type:"
f" {type(value)}"
)

return value

def get_state(self) -> IncrementalColumnState:
"""Returns an Incremental state for a particular cursor column"""
if self.end_value is not None:
Expand Down Expand Up @@ -335,6 +383,10 @@ def _cursor_datetime_check(value: Any, arg_name: str) -> None:
@property
def last_value(self) -> Optional[TCursorValue]:
s = self.get_state()

if self.lag is not None:
return self._apply_lag(s["last_value"])

return s["last_value"] # type: ignore

def _transform_item(
Expand Down
123 changes: 123 additions & 0 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,3 +997,126 @@ def events_timezone_unset():

values = [r[0].strftime("%Y-%m-%dT%H:%M:%S.%f") for r in rows]
assert values == output_map[destination]["tables"][t]["timestamp_values"] # type: ignore


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"]
),
ids=lambda x: x.name,
)
def test_pipeline_resource_incremental_int_lag(
destination_config: DestinationTestConfiguration,
) -> None:
"""
Test incremental lag behavior when updating previously loaded data.

Three resources (`r1`, `r2`, `r3`) share the primary key 'id'.
- `r1` loads `id=0` and `id=1`.
- `r2` adds `id=2` and `id=3`.
- `r3`, with lag=1, skips `id=1` and updates entries with `id=2`, `id=3`,
and adds a new entry `id=4`. The merge operation reflects these changes.

We validate that the final dataset correctly orders the updated and new entries,
excluding `id=1` from the updates due to the lag.
"""

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
pipeline = destination_config.setup_pipeline("pipeline", dataset_name=dataset_name)

name = "items"

@dlt.resource(name=name, primary_key="id")
def r1(_=dlt.sources.incremental("id")):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have similar comments as to the test below:

  1. use just one resource with "append" write disposition. you can return the same response each time
  2. call the resource several time with different lags (use apply_hints to replace incremental)
  3. test if expected elements got duplicated
  4. IMO you'll have a deduplication bug that I describe above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented your comments except for number 2:

  • call the resource several time with different lags (use apply_hints to replace incremental)

I have used @pytest.mark.parametrize("lag", [-1, 10]) to execute the code with different lags instead of using apply_hints.

Also facing the deduplication bug - more info above in the other comments.

yield from [{"id": 0, "name": "bobby"}, {"id": 1, "name": "jeremy"}]

@dlt.resource(name=name, primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 2, "name": "james"}, {"id": 3, "name": "john"}]

# r3 has lag=1, so id=1 is ignored, only id>=2 is updated
updated_items = [
{"id": 1, "name": "maria"},
{"id": 2, "name": "mark"},
{"id": 3, "name": "pablo"},
]

@dlt.resource(name=name, primary_key="id", write_disposition="merge")
def r3(_=dlt.sources.incremental("id", lag=1)):
yield from [{"id": 4, "name": "max"}] + updated_items

pipeline.run(r1)
pipeline.run(r2)
pipeline.run(r3)

# Validate final dataset
with pipeline.sql_client() as sql_client:
assert [
row[0] for row in sql_client.execute_sql(f"SELECT name FROM {name} ORDER BY id")
] == ["bobby", "jeremy", "mark", "pablo", "max"]


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_staging_configs=True, default_sql_configs=True, subset=["duckdb", "postgres"]
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("lag", [3602, 3601, 3600, 3599, -1])
def test_pipeline_resource_incremental_datetime_lag(
destination_config: DestinationTestConfiguration, lag: float
) -> None:
"""
Test incremental lag behavior for datetime data while using `id` as the primary key.

- `r1` loads entries with `id` and `created_at` timestamps.
- `r2`, with different lag values, updates entries but only considers records
where `created_at` is within the lag window.

We validate that the final dataset reflects the correct data updates, taking the lag into account.
"""

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
pipeline = destination_config.setup_pipeline("pipeline", dataset_name=dataset_name)

name = "events"

@dlt.resource(name=name, primary_key="id")
def r1(_=dlt.sources.incremental("created_at")):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the concept of test is good. but IMO you should create just one resource with merge write disposition that you call with the lag you want from the very beginning. this is how people will use it IMO.

emulate returning "updated_events" on the second call to it ie. via some kind of nonlocal flag that tells to add those events on the second time.

the results of the test look good but please test two additional things

  • that you do not apply lag to "initial_value" (set it in incremental)
  • there's IMO issue with internal deduplication. please test for updated_events that update id=3:
{
                "id": 3,
                "created_at": "2023-03-03T02:00:01Z",
                "event": "updated",
            }
IMO it will not be included into final result because we'll deduplicate it. you should IMO disable deduplication when lag is defined
```py
@property
    def deduplication_disabled(self) -> bool:
        """Skip deduplication when length of the key is 0"""
        return isinstance(self.primary_key, (list, tuple)) and len(self.primary_key) == 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, there is a deduplication bug - I have captured it in the tests but surprisingly setting deduplication_disabled to True doesn't solve issue. Any tips on how to solve it?


I have refactored the code and tests with your suggestions.


Question, what do you mean with the following point?

  • that you do not apply lag to "initial_value" (set it in incremental)

In the currently implementation lag only applies to last_value but I see that initially last_value is set to initial_value on get_state.

Do I need to ignore the _apply_lag exec when last_value == initial_value?

    def last_value(self) -> Optional[TCursorValue]:
        s = self.get_state()

        if self.lag is not None:
            return self._apply_lag(s["last_value"])

        return s["last_value"]  # type: ignore

yield from [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1"},
{"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "2"},
{
"id": 3,
"created_at": "2023-03-03T02:00:01Z",
"event": "3",
}, # the lag will be applied here
]

updated_events = [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "updated"},
{"id": 2, "created_at": "2023-03-03T01:00:01Z", "event": "updated"},
]

@dlt.resource(name=name, primary_key="id", write_disposition="merge")
def r2(_=dlt.sources.incremental("created_at", lag=lag)):
yield from [{"id": 4, "created_at": "2023-03-03T03:00:00Z", "event": "4"}] + updated_events

pipeline.run(r1)
pipeline.run(r2)

# Validate final dataset
results = {
3602: ["updated", "updated", "3", "4"],
3601: ["updated", "updated", "3", "4"],
3600: ["1", "updated", "3", "4"],
3599: ["1", "2", "3", "4"],
-1: ["1", "2", "3", "4"],
}

with pipeline.sql_client() as sql_client:
assert [
row[0] for row in sql_client.execute_sql(f"SELECT event FROM {name} ORDER BY id")
] == results[int(lag)]
Loading