Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Using one single function call for utcnow(). #4307

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from feast.repo_contents import RepoContents
from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now
from feast.version import get_version

warnings.simplefilter("once", DeprecationWarning)
Expand Down Expand Up @@ -1246,7 +1247,7 @@ def materialize_incremental(
>>> from feast import FeatureStore, RepoConfig
>>> from datetime import datetime, timedelta
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
>>> fs.materialize_incremental(end_date=_utc_now() - timedelta(minutes=5))
Materializing...
<BLANKLINE>
...
Expand All @@ -1270,15 +1271,15 @@ def materialize_incremental(
f" either a ttl to be set or for materialize() to have been run at least once."
)
elif feature_view.ttl.total_seconds() > 0:
start_date = datetime.utcnow() - feature_view.ttl
start_date = _utc_now() - feature_view.ttl
else:
# TODO(felixwang9817): Find the earliest timestamp for this specific feature
# view from the offline store, and set the start date to that timestamp.
print(
f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, "
"the start date will be set to 1 year before the current time."
)
start_date = datetime.utcnow() - timedelta(weeks=52)
start_date = _utc_now() - timedelta(weeks=52)
provider = self._get_provider()
print(
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
Expand Down Expand Up @@ -1335,7 +1336,7 @@ def materialize(
>>> from datetime import datetime, timedelta
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> fs.materialize(
... start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
... start_date=_utc_now() - timedelta(hours=3), end_date=_utc_now() - timedelta(minutes=10)
... )
Materializing...
<BLANKLINE>
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.utils import get_user_agent
from feast.utils import _utc_now, get_user_agent

from .bigquery_source import (
BigQueryLoggingDestination,
Expand Down Expand Up @@ -701,7 +701,7 @@ def _upload_entity_df(

# Ensure that the table expires after some time
table = client.get_table(table=table_name)
table.expires = datetime.utcnow() + timedelta(minutes=30)
table.expires = _utc_now() + timedelta(minutes=30)
client.update_table(table, ["expires"])

return table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import datetime
import signal
from dataclasses import dataclass
from enum import Enum
Expand All @@ -16,6 +15,7 @@
from feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map import (
trino_to_pa_value_type,
)
from feast.utils import _utc_now


class QueryStatus(Enum):
Expand Down Expand Up @@ -97,12 +97,12 @@ def __init__(self, query_text: str, cursor: Cursor):
def execute(self) -> Results:
try:
self.status = QueryStatus.RUNNING
start_time = datetime.datetime.utcnow()
start_time = _utc_now()

self._cursor.execute(operation=self.query_text)
rows = self._cursor.fetchall()

end_time = datetime.datetime.utcnow()
end_time = _utc_now()
self.execution_time = end_time - start_time
self.status = QueryStatus.COMPLETED

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.utils import get_user_agent
from feast.utils import _utc_now, get_user_agent

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,7 +122,7 @@ def update(
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
entity.update({"created_ts": _utc_now()})
client.put(entity)

for table in tables_to_delete:
Expand Down Expand Up @@ -457,7 +457,7 @@ def update(self):
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
entity.update({"created_ts": _utc_now()})
client.put(entity)

def teardown(self):
Expand Down
9 changes: 5 additions & 4 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import abstractmethod
from datetime import datetime, timedelta
from datetime import timedelta
from threading import Lock
from typing import List, Optional

Expand All @@ -15,6 +15,7 @@
from feast.project_metadata import ProjectMetadata
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +28,7 @@ def __init__(
):
self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0
Expand Down Expand Up @@ -318,7 +319,7 @@ def refresh(self, project: Optional[str] = None):
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

def _refresh_cached_registry_if_necessary(self):
with self._refresh_lock:
Expand All @@ -329,7 +330,7 @@ def _refresh_cached_registry_if_necessary(self):
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.utcnow()
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.infra.registry.registry import RegistryConfig
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.utils import _utc_now

REGISTRY_SCHEMA_VERSION = "1"

Expand Down Expand Up @@ -89,7 +89,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())

file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import uuid
from datetime import datetime
from pathlib import Path

from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now


class FileRegistryStore(RegistryStore):
Expand Down Expand Up @@ -37,7 +37,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
with open(self._filepath, mode="wb", buffering=0) as f:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/gcs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now


class GCSRegistryStore(RegistryStore):
Expand Down Expand Up @@ -62,7 +62,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
# we have already checked the bucket exists so no need to do it again
gs_bucket = self.gcs_client.get_bucket(self._bucket)
blob = gs_bucket.blob(self._blob)
Expand Down
21 changes: 11 additions & 10 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from feast.repo_contents import RepoContents
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now

REGISTRY_SCHEMA_VERSION = "1"

Expand Down Expand Up @@ -217,7 +218,7 @@ def clone(self) -> "Registry":
if self.cached_registry_proto
else RegistryProto()
)
new_registry.cached_registry_proto_created = datetime.utcnow()
new_registry.cached_registry_proto_created = _utc_now()
new_registry._registry_store = NoopRegistryStore()
return new_registry

Expand Down Expand Up @@ -248,7 +249,7 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra:
def apply_entity(self, entity: Entity, project: str, commit: bool = True):
entity.is_valid()

now = datetime.utcnow()
now = _utc_now()
if not entity.created_timestamp:
entity.created_timestamp = now
entity.last_updated_timestamp = now
Expand Down Expand Up @@ -334,7 +335,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True):
def apply_feature_service(
self, feature_service: FeatureService, project: str, commit: bool = True
):
now = datetime.utcnow()
now = _utc_now()
if not feature_service.created_timestamp:
feature_service.created_timestamp = now
feature_service.last_updated_timestamp = now
Expand Down Expand Up @@ -390,7 +391,7 @@ def apply_feature_view(
):
feature_view.ensure_valid()

now = datetime.utcnow()
now = _utc_now()
if not feature_view.created_timestamp:
feature_view.created_timestamp = now
feature_view.last_updated_timestamp = now
Expand Down Expand Up @@ -517,7 +518,7 @@ def apply_materialization(
existing_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_feature_view.last_updated_timestamp = datetime.utcnow()
existing_feature_view.last_updated_timestamp = _utc_now()
feature_view_proto = existing_feature_view.to_proto()
feature_view_proto.spec.project = project
del self.cached_registry_proto.feature_views[idx]
Expand All @@ -539,7 +540,7 @@ def apply_materialization(
existing_stream_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
existing_stream_feature_view.last_updated_timestamp = _utc_now()
stream_feature_view_proto = existing_stream_feature_view.to_proto()
stream_feature_view_proto.spec.project = project
del self.cached_registry_proto.stream_feature_views[idx]
Expand Down Expand Up @@ -664,7 +665,7 @@ def apply_saved_dataset(
project: str,
commit: bool = True,
):
now = datetime.utcnow()
now = _utc_now()
if not saved_dataset.created_timestamp:
saved_dataset.created_timestamp = now
saved_dataset.last_updated_timestamp = now
Expand Down Expand Up @@ -812,7 +813,7 @@ def _prepare_registry_for_changes(self, project: str):
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
self.cached_registry_proto = registry_proto
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

# Initialize project metadata if needed
assert self.cached_registry_proto
Expand Down Expand Up @@ -848,7 +849,7 @@ def _get_registry_proto(
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.utcnow()
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
Expand All @@ -871,7 +872,7 @@ def _get_registry_proto(
logger.info("Registry cache expired, so refreshing")
registry_proto = self._registry_store.get_registry_proto()
self.cached_registry_proto = registry_proto
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

if not project:
return registry_proto
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse
Expand All @@ -9,6 +8,7 @@
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now

try:
import boto3
Expand Down Expand Up @@ -70,7 +70,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
# we have already checked the bucket exists so no need to do it again
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
Expand Down
Loading
Loading