Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add incremental lag for datetime, int, and float cursors #1957

Merged
merged 20 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import datetime # noqa: I251
import re
from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa

from pendulum.parsing import (
Expand Down Expand Up @@ -154,6 +155,53 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.")


def detect_datetime_format(value: str) -> Optional[str]:
format_patterns = {
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# Full datetime with 'Z' (UTC) or timezone offset
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"): "%Y-%m-%dT%H:%M:%SZ", # UTC 'Z'
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z$"
): "%Y-%m-%dT%H:%M:%S.%fZ", # UTC with fractional seconds
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Timezone offset
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S%z", # Timezone without colon
# Full datetime with fractional seconds and timezone
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S.%f%z",
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{4}$"
): "%Y-%m-%dT%H:%M:%S.%f%z", # Timezone without colon
# Datetime without timezone
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$"): "%Y-%m-%dT%H:%M:%S", # No timezone
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}$"): "%Y-%m-%dT%H:%M", # Minute precision
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}$"): "%Y-%m-%dT%H", # Hour precision
# Date-only formats
re.compile(r"^\d{4}-\d{2}-\d{2}$"): "%Y-%m-%d", # Date only
re.compile(r"^\d{4}-\d{2}$"): "%Y-%m", # Year and month
re.compile(r"^\d{4}$"): "%Y", # Year only
# Week-based date formats
re.compile(r"^\d{4}-W\d{2}$"): "%Y-W%W", # Week-based date
re.compile(r"^\d{4}-W\d{2}-\d{1}$"): "%Y-W%W-%u", # Week-based date with day
# Ordinal date formats (day of year)
re.compile(r"^\d{4}-\d{3}$"): "%Y-%j", # Ordinal date
# Compact formats (no dashes)
re.compile(r"^\d{8}$"): "%Y%m%d", # Compact date format
re.compile(r"^\d{6}$"): "%Y%m", # Compact year and month format
}

# Match against each compiled regular expression
for pattern, format_str in format_patterns.items():
if pattern.match(value):
return format_str

# Return None if no pattern matches
return None


def to_py_datetime(value: datetime.datetime) -> datetime.datetime:
"""Convert a pendulum.DateTime to a py datetime object.

Expand Down
42 changes: 36 additions & 6 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from datetime import datetime # noqa: I251
from typing import Generic, ClassVar, Any, Optional, Type, Dict
from typing_extensions import get_origin, get_args
from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union
from typing_extensions import get_args

import inspect
from functools import wraps
Expand Down Expand Up @@ -41,13 +41,13 @@
LastValueFunc,
OnCursorValueMissing,
)
from dlt.extract.pipe import Pipe
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
from dlt.extract.incremental.transform import (
JsonIncremental,
ArrowIncremental,
IncrementalTransform,
)
from dlt.extract.incremental.lag import apply_lag

try:
from dlt.common.libs.pyarrow import is_arrow_item
Expand Down Expand Up @@ -101,6 +101,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
The values passed explicitly to Incremental will be ignored.
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude
lag: Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func.
"""

# this is config/dataclass so declare members
Expand All @@ -111,6 +112,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
row_order: Optional[TSortOrder] = None
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"
lag: Optional[float] = None

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand All @@ -126,6 +128,7 @@ def __init__(
row_order: Optional[TSortOrder] = None,
allow_external_schedulers: bool = False,
on_cursor_value_missing: OnCursorValueMissing = "raise",
lag: Optional[float] = None,
) -> None:
# make sure that path is valid
if cursor_path:
Expand All @@ -149,6 +152,8 @@ def __init__(

self._cached_state: IncrementalColumnState = None
"""State dictionary cached on first access"""

self.lag = lag
super().__init__(lambda x: x) # TODO:

self.end_out_of_range: bool = False
Expand Down Expand Up @@ -185,6 +190,7 @@ def _make_transforms(self) -> None:
self._primary_key,
set(self._cached_state["unique_hashes"]),
self.on_cursor_value_missing,
self.lag,
)

@classmethod
Expand All @@ -208,9 +214,14 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue
>>> my_resource(updated=incremental(initial_value='2023-01-01', end_value='2023-02-01'))
"""
# func, resource name and primary key are not part of the dict
kwargs = dict(self, last_value_func=self.last_value_func, primary_key=self._primary_key)
kwargs = dict(
self, last_value_func=self.last_value_func, primary_key=self._primary_key, lag=self.lag
)
for key, value in dict(
other, last_value_func=other.last_value_func, primary_key=other.primary_key
other,
last_value_func=other.last_value_func,
primary_key=other.primary_key,
lag=other.lag,
).items():
if value is not None:
kwargs[key] = value
Expand Down Expand Up @@ -284,6 +295,7 @@ def parse_native_representation(self, native_value: Any) -> None:
self._primary_key = merged._primary_key
self.allow_external_schedulers = merged.allow_external_schedulers
self.row_order = merged.row_order
self.lag = merged.lag
self.__is_resolved__ = self.__is_resolved__
else: # TODO: Maybe check if callable(getattr(native_value, '__lt__', None))
# Passing bare value `incremental=44` gets parsed as initial_value
Expand Down Expand Up @@ -335,7 +347,25 @@ def _cursor_datetime_check(value: Any, arg_name: str) -> None:
@property
def last_value(self) -> Optional[TCursorValue]:
s = self.get_state()
return s["last_value"] # type: ignore
last_value: TCursorValue = s["last_value"]

if self.lag:
if self.last_value_func not in (max, min):
logger.warning(
f"Lag on {self.resource_name} is only supported for max or min last_value_func."
f" Provided: {self.last_value_func}"
)
elif self.end_value is not None:
logger.info(
f"Lag on {self.resource_name} is deactivated if end_value is set in"
" incremental."
)
elif last_value is not None:
last_value = apply_lag(
self.lag, s["initial_value"], last_value, self.last_value_func
)

return last_value

def _transform_item(
self, transformer: IncrementalTransform, row: TDataItem
Expand Down
74 changes: 74 additions & 0 deletions dlt/extract/incremental/lag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from datetime import datetime, timedelta, date # noqa: I251
from typing import Union

from dlt.common import logger
from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format

from . import TCursorValue, LastValueFunc


def _apply_lag_to_value(
lag: float, value: TCursorValue, last_value_func: LastValueFunc[TCursorValue]
) -> TCursorValue:
"""Applies lag to a value, in case of `str` types it attempts to return a string
with the lag applied preserving original format of a datetime/date
"""
# Determine if the input is originally a string and capture its format
is_str = isinstance(value, str)
value_format = detect_datetime_format(value) if is_str else None
is_str_date = value_format in ("%Y%m%d", "%Y-%m-%d") if value_format else None
parsed_value = ensure_pendulum_datetime(value) if is_str else value

if isinstance(parsed_value, (datetime, date)):
parsed_value = _apply_lag_to_datetime(lag, parsed_value, last_value_func, is_str_date)
# go back to string or pass exact type
value = parsed_value.strftime(value_format) if value_format else parsed_value # type: ignore[assignment]

elif isinstance(parsed_value, (int, float)):
value = _apply_lag_to_number(lag, parsed_value, last_value_func) # type: ignore[assignment]

else:
logger.error(
f"Lag is not supported for cursor type: {type(value)} with last_value_func:"
f" {last_value_func}. Strings must parse to DateTime or Date."
)

return value


def _apply_lag_to_datetime(
lag: float,
value: Union[datetime, date],
last_value_func: LastValueFunc[TCursorValue],
is_str_date: bool,
) -> Union[datetime, date]:
if isinstance(value, datetime) and not is_str_date:
delta = timedelta(seconds=lag)
elif is_str_date or isinstance(value, date):
delta = timedelta(days=lag)
return value - delta if last_value_func is max else value + delta


def _apply_lag_to_number(
lag: float, value: Union[int, float], last_value_func: LastValueFunc[TCursorValue]
) -> Union[int, float]:
adjusted_value = value - lag if last_value_func is max else value + lag
return int(adjusted_value) if isinstance(value, int) else adjusted_value


def apply_lag(
lag: float,
initial_value: TCursorValue,
last_value: TCursorValue,
last_value_func: LastValueFunc[TCursorValue],
) -> TCursorValue:
"""Applies lag to `last_value` but prevents it to cross `initial_value`: observing order of last_value_func"""
# Skip lag adjustment to avoid out-of-bounds issues
lagged_last_value = _apply_lag_to_value(lag, last_value, last_value_func)
if (
initial_value is not None
and last_value_func((initial_value, lagged_last_value)) == initial_value
):
# do not cross initial_value
return initial_value
return lagged_last_value
12 changes: 10 additions & 2 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
primary_key: Optional[TTableHintTemplate[TColumnNames]],
unique_hashes: Set[str],
on_cursor_value_missing: OnCursorValueMissing = "raise",
lag: Optional[float] = None,
) -> None:
self.resource_name = resource_name
self.cursor_path = cursor_path
Expand All @@ -70,7 +71,7 @@ def __init__(
self.unique_hashes = unique_hashes
self.start_unique_hashes = set(unique_hashes)
self.on_cursor_value_missing = on_cursor_value_missing

self.lag = lag
# compile jsonpath
self._compiled_cursor_path = compile_path(cursor_path)
# for simple column name we'll fallback to search in dict
Expand Down Expand Up @@ -109,7 +110,14 @@ def __call__(

@property
def deduplication_disabled(self) -> bool:
"""Skip deduplication when length of the key is 0"""
"""Skip deduplication when length of the key is 0 or if lag is applied."""
# disable deduplication if end value is set - state is not saved
if self.end_value is not None:
return True
# disable deduplication if lag is applied - destination must deduplicate ranges
if self.lag and self.last_value_func in (min, max):
return True
# disable deduplication if primary_key = ()
return isinstance(self.primary_key, (list, tuple)) and len(self.primary_key) == 0


Expand Down
3 changes: 2 additions & 1 deletion dlt/extract/incremental/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing_extensions import TypedDict

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar
from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
Expand All @@ -25,3 +25,4 @@ class IncrementalArgs(TypedDict, total=False):
end_value: Optional[str]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
59 changes: 59 additions & 0 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,65 @@ result_filtered = list(without_none)
assert len(result_filtered) == 2
```

## Lag / Attribution Window
In many cases, certain data should be reacquired during incremental loading. For example, you may want to always capture the last 7 days of data when fetching daily analytics reports, or refresh Slack message replies with a moving window of 7 days. This is where the concept of "lag" or "attribution window" comes into play.

The `lag` parameter is a float that supports several types of incremental cursors: `datetime`, `date`, `integer`, and `float`. It can only be used with `last_value_func` set to `min` or `max` (default is `max`).

### How `lag` Works

- **Datetime cursors**: `lag` is the number of seconds added or subtracted from the `last_value` loaded.
- **Date cursors**: `lag` represents days.
- **Numeric cursors (integer or float)**: `lag` respects the given unit of the cursor.

This flexibility allows `lag` to adapt to different data contexts.


### Example using `datetime` incremental cursor with `merge` as `write_disposition`

This example demonstrates how to use a `datetime` cursor with a `lag` parameter, applying `merge` as the `write_disposition`. The setup runs twice, and during the second run, the `lag` parameter re-fetches recent entries to capture updates.

1. **First Run**: Loads `initial_entries`.
2. **Second Run**: Loads `second_run_events` with the specified lag, refreshing previously loaded entries.

This setup demonstrates how `lag` ensures that a defined period of data remains refreshed, capturing updates or changes within the attribution window.

```py
pipeline = dlt.pipeline(
destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")),
)

# Flag to indicate the second run
is_second_run = False

@dlt.resource(name="events", primary_key="id", write_disposition="merge")
def events_resource(
_=dlt.sources.incremental("created_at", lag=3600, last_value_func=max)
):
nonlocal is_second_run

# Data for the initial run
initial_entries = [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1"},
{"id": 2, "created_at": "2023-03-03T02:00:00Z", "event": "2"}, # lag applied during second run
]

# Data for the second run
second_run_events = [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1_updated"},
{"id": 2, "created_at": "2023-03-03T02:00:01Z", "event": "2_updated"},
{"id": 3, "created_at": "2023-03-03T03:00:00Z", "event": "3"},
]

# Yield data based on the current run
yield from second_run_events if is_second_run else initial_entries

# Run the pipeline twice
pipeline.run(events_resource)
is_second_run = True # Update flag for second run
pipeline.run(events_resource)
```


## Doing a full refresh

Expand Down
Loading
Loading