Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pendulum 3 support #19480

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,17 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
"storage_tests_sqlalchemy_1_4",
"daemon_sensor_tests",
"daemon_tests",
"definitions_tests_old_pendulum",
"definitions_tests",
"definitions_tests_pendulum_1",
"definitions_tests_pendulum_2",
"general_tests",
"general_tests_old_protobuf",
"scheduler_tests",
"scheduler_tests_old_pendulum",
"scheduler_tests_pendulum_1",
"scheduler_tests_pendulum_2",
"execution_tests",
"storage_tests",
"type_signature_tests",
"definitions_tests",
"asset_defs_tests",
"launcher_tests",
"logging_tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
all_daemons_healthy,
)
from dagster._seven.compat.pendulum import pendulum_freeze_time
from utils import start_daemon


Expand All @@ -23,5 +24,5 @@ def test_heartbeat():
+ DEFAULT_DAEMON_HEARTBEAT_TOLERANCE_SECONDS
+ 5
)
with pendulum.test(frozen_datetime):
with pendulum_freeze_time(frozen_datetime):
assert all_daemons_healthy(instance) is False
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.workspace.context import WorkspaceRequestContext
from dagster._seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime
from dagster._seven.compat.pendulum import create_pendulum_time
from dagster._seven.compat.pendulum import create_pendulum_time, pendulum_freeze_time
from dagster._utils import Counter, traced_counter
from dagster_graphql.implementation.utils import UserFacingGraphQLError
from dagster_graphql.test.utils import (
Expand Down Expand Up @@ -697,7 +697,7 @@ def test_unloadable_schedule(graphql_context):

stopped_origin = _get_unloadable_schedule_origin("unloadable_stopped")

with pendulum.test(initial_datetime):
with pendulum_freeze_time(initial_datetime):
instance.add_instigator_state(running_instigator_state)

instance.add_instigator_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster._core.workspace.context import WorkspaceRequestContext
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
from dagster._seven.compat.pendulum import pendulum_freeze_time
from dagster._utils import Counter, traced_counter
from dagster._utils.error import SerializableErrorInfo
from dagster_graphql.implementation.utils import UserFacingGraphQLError
Expand Down Expand Up @@ -1062,15 +1063,15 @@ def test_sensor_tick_range(graphql_context: WorkspaceRequestContext):

now = pendulum.now("US/Central")
one = now.subtract(days=2).subtract(hours=1)
with pendulum.test(one):
with pendulum_freeze_time(one):
_create_tick(graphql_context)

two = now.subtract(days=1).subtract(hours=1)
with pendulum.test(two):
with pendulum_freeze_time(two):
_create_tick(graphql_context)

three = now.subtract(hours=1)
with pendulum.test(three):
with pendulum_freeze_time(three):
_create_tick(graphql_context)

result = execute_dagster_graphql(
Expand Down Expand Up @@ -1174,7 +1175,7 @@ def test_sensor_ticks_filtered(graphql_context: WorkspaceRequestContext):
)

now = pendulum.now("US/Central")
with pendulum.test(now):
with pendulum_freeze_time(now):
_create_tick(graphql_context) # create a success tick

# create a started tick
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import datetime
from typing import TYPE_CHECKING, AbstractSet, Optional, Sequence, Tuple

import pendulum

from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._seven.compat.pendulum import (
PendulumInterval,
)
from dagster._utils.schedules import cron_string_iterator

if TYPE_CHECKING:
Expand All @@ -27,7 +28,7 @@ def get_execution_period_for_policy(
freshness_policy: FreshnessPolicy,
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> pendulum.Period:
) -> PendulumInterval:
if freshness_policy.cron_schedule:
tick_iterator = cron_string_iterator(
start_timestamp=current_time.timestamp(),
Expand All @@ -41,18 +42,18 @@ def get_execution_period_for_policy(
tick = next(tick_iterator)
required_data_time = tick - freshness_policy.maximum_lag_delta
if effective_data_time is None or effective_data_time < required_data_time:
return pendulum.Period(start=required_data_time, end=tick)
return PendulumInterval(start=required_data_time, end=tick)

else:
# occurs when asset is missing
if effective_data_time is None:
return pendulum.Period(
return PendulumInterval(
# require data from at most maximum_lag_delta ago
start=current_time - freshness_policy.maximum_lag_delta,
# this data should be available as soon as possible
end=current_time,
)
return pendulum.Period(
return PendulumInterval(
# we don't want to execute this too frequently
start=effective_data_time + 0.9 * freshness_policy.maximum_lag_delta,
end=max(effective_data_time + freshness_policy.maximum_lag_delta, current_time),
Expand All @@ -64,7 +65,7 @@ def get_execution_period_and_evaluation_data_for_policies(
policies: AbstractSet[FreshnessPolicy],
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> Tuple[Optional[pendulum.Period], Optional["TextRuleEvaluationData"]]:
) -> Tuple[Optional[PendulumInterval], Optional["TextRuleEvaluationData"]]:
"""Determines a range of times for which you can kick off an execution of this asset to solve
the most pressing constraint, alongside a maximum number of additional constraints.
"""
Expand All @@ -84,7 +85,7 @@ def get_execution_period_and_evaluation_data_for_policies(
if merged_period is None:
merged_period = period
elif period.start <= merged_period.end:
merged_period = pendulum.Period(
merged_period = PendulumInterval(
start=max(period.start, merged_period.start),
end=period.end,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import datetime
from typing import AbstractSet, NamedTuple, Optional

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import pendulum_create_timezone
from dagster._utils.schedules import (
is_valid_cron_schedule,
reverse_cron_string_iterator,
Expand Down Expand Up @@ -122,7 +121,7 @@ def __new__(
)
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(cron_schedule_timezone) # type: ignore
pendulum_create_timezone(cron_schedule_timezone) # type: ignore
except Exception as e:
raise DagsterInvalidDefinitionError(
"Invalid cron schedule timezone '{cron_schedule_timezone}'. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
cast,
)

import pendulum
from typing_extensions import TypeAlias

import dagster._check as check
Expand All @@ -29,6 +28,7 @@
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import pendulum_create_timezone
from dagster._utils import IHasInternalInit, ensure_gen
from dagster._utils.merger import merge_dicts
from dagster._utils.schedules import is_valid_cron_schedule
Expand Down Expand Up @@ -692,7 +692,7 @@ def _execution_fn(context: ScheduleEvaluationContext) -> RunRequestIterator:
if self._execution_timezone:
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(self._execution_timezone) # type: ignore
pendulum_create_timezone(self._execution_timezone)
except Exception as e:
raise DagsterInvalidDefinitionError(
f"Invalid execution timezone {self._execution_timezone} for {name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
unpack_value,
)
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_2,
_IS_PENDULUM_1,
PRE_TRANSITION,
PendulumDateTime,
create_pendulum_time,
to_timezone,
Expand Down Expand Up @@ -146,7 +147,7 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
# Pendulum 1.x erroneously believes that there are two instances of the *second* hour after
# a datetime transition, so to work around this we calculate the timestamp of the next
# microsecond of the given datetime.
dt_microsecond = dt.microsecond + 1 if not _IS_PENDULUM_2 else dt.microsecond
dt_microsecond = dt.microsecond + 1 if _IS_PENDULUM_1 else dt.microsecond
dt = create_pendulum_time(
dt.year,
dt.month,
Expand All @@ -156,9 +157,9 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
dt.second,
dt_microsecond,
tz=tz,
dst_rule=pendulum.PRE_TRANSITION,
dst_rule=PRE_TRANSITION,
)
if not _IS_PENDULUM_2:
if _IS_PENDULUM_1:
dt = dt.add(microseconds=-1)
return dt

Expand All @@ -181,7 +182,7 @@ def pack(
self, datetime: Optional[datetime], whitelist_map: WhitelistMap, descent_path: str
) -> Optional[Mapping[str, Any]]:
if datetime:
check.invariant(datetime.tzinfo is not None)
check.invariant(datetime.tzinfo is not None, "No timezone set")
pendulum_datetime = pendulum.instance(datetime, tz=datetime.tzinfo)
timezone_name = pendulum_datetime.timezone.name

Expand Down
82 changes: 74 additions & 8 deletions python_modules/dagster/dagster/_seven/compat/pendulum.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from contextlib import contextmanager
from unittest import mock

import packaging.version
import pendulum
Expand All @@ -10,19 +11,40 @@
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 2
)

_IS_PENDULUM_3 = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 3
)

# pendulum 1 has no __version__ property
_IS_PENDULUM_1 = not _IS_PENDULUM_2 and not _IS_PENDULUM_3

POST_TRANSITION = pendulum.tz.POST_TRANSITION if _IS_PENDULUM_3 else pendulum.POST_TRANSITION
PRE_TRANSITION = pendulum.tz.PRE_TRANSITION if _IS_PENDULUM_3 else pendulum.PRE_TRANSITION
TRANSITION_ERROR = pendulum.tz.TRANSITION_ERROR if _IS_PENDULUM_3 else pendulum.TRANSITION_ERROR


def pendulum_create_timezone(tz_name: str):
if _IS_PENDULUM_3:
from pendulum.tz.timezone import Timezone

return Timezone(tz_name)
else:
return pendulum.tz.timezone(tz_name) # type: ignore


@contextmanager
def mock_pendulum_timezone(override_timezone):
if _IS_PENDULUM_2:
with pendulum.tz.test_local_timezone(pendulum.tz.timezone(override_timezone)):
if _IS_PENDULUM_1:
with pendulum.tz.LocalTimezone.test(pendulum.Timezone.load(override_timezone)):
yield
else:
with pendulum.tz.LocalTimezone.test(pendulum.Timezone.load(override_timezone)):
with pendulum.tz.test_local_timezone(pendulum.tz.timezone(override_timezone)):
yield


def create_pendulum_time(year, month, day, *args, **kwargs):
if "tz" in kwargs and "dst_rule" in kwargs and not _IS_PENDULUM_2:
if "tz" in kwargs and "dst_rule" in kwargs and _IS_PENDULUM_1:
tz = pendulum.timezone(kwargs.pop("tz"))
dst_rule = kwargs.pop("dst_rule")

Expand All @@ -39,17 +61,53 @@ def create_pendulum_time(year, month, day, *args, **kwargs):
)
)

if "dst_rule" in kwargs and _IS_PENDULUM_3:
dst_rule = kwargs.pop("dst_rule")
if dst_rule == PRE_TRANSITION:
kwargs["fold"] = 0
elif dst_rule == POST_TRANSITION:
kwargs["fold"] = 1
elif dst_rule == TRANSITION_ERROR:
tz_name = kwargs.pop("tz")
assert tz_name
return pendulum.instance(
pendulum_create_timezone(tz_name).convert(
datetime.datetime(
year,
month,
day,
*args,
**kwargs,
),
raise_on_unknown_times=True,
)
)

return (
pendulum.datetime(year, month, day, *args, **kwargs)
if _IS_PENDULUM_2
else pendulum.create(year, month, day, *args, **kwargs)
pendulum.create(year, month, day, *args, **kwargs)
if _IS_PENDULUM_1
else pendulum.datetime(year, month, day, *args, **kwargs)
)


PendulumDateTime: TypeAlias = (
pendulum.DateTime if _IS_PENDULUM_2 else pendulum.Pendulum # type: ignore[attr-defined]
pendulum.Pendulum if _IS_PENDULUM_1 else pendulum.DateTime # type: ignore[attr-defined]
)

PendulumInterval: TypeAlias = (
pendulum.Interval if _IS_PENDULUM_3 else pendulum.Period # type: ignore[attr-defined]
)


@contextmanager
def pendulum_freeze_time(t):
if _IS_PENDULUM_3:
with mock.patch("pendulum.now", return_value=t):
yield
else:
with pendulum.test(t) as frozen_time:
yield frozen_time


# Workaround for issue with .in_tz() in pendulum:
# https://github.com/sdispater/pendulum/issues/535
Expand All @@ -59,3 +117,11 @@ def to_timezone(dt: PendulumDateTime, tz: str):
if timestamp < 0:
return pendulum.from_timestamp(0, tz=tz) + datetime.timedelta(seconds=timestamp)
return pendulum.from_timestamp(dt.timestamp(), tz=tz)


def get_crontab_day_of_week(dt: PendulumDateTime) -> int:
if _IS_PENDULUM_3:
# In pendulum 3, 0-6 is Monday-Sunday (unlike crontab, where 0-6 is Sunday-Saturday)
return (dt.day_of_week + 1) % 7
else:
return dt.day_of_week
Loading