Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add interfaces for batch materialization engine #2901

Merged
merged 11 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdk/python/feast/infra/materialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationTask,
)
from .local_engine import LocalMaterializationEngine

__all__ = [
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
"MaterializationJob",
"MaterializationTask",
"BatchMaterializationEngine",
"LocalMaterializationEngine",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import dataclasses
achals marked this conversation as resolved.
Show resolved Hide resolved
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Callable, List, Optional, Sequence, Union

from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView


@dataclasses.dataclass
achals marked this conversation as resolved.
Show resolved Hide resolved
class MaterializationTask:
project: str
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
start_time: datetime
end_time: datetime
tqdm_builder: Callable[[int], tqdm]


class MaterializationJob(ABC):
task: MaterializationTask

@abstractmethod
def status(self) -> str:
...

@abstractmethod
def should_be_retried(self) -> bool:
...

@abstractmethod
def job_id(self) -> str:
...

@abstractmethod
def url(self) -> Optional[str]:
...


class BatchMaterializationEngine(ABC):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
self.repo_config = repo_config
self.offline_store = offline_store
self.online_store = online_store

@abstractmethod
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
...

@abstractmethod
def materialize(
self, registry, tasks: List[MaterializationTask]
achals marked this conversation as resolved.
Show resolved Hide resolved
) -> List[MaterializationJob]:
...

@abstractmethod
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
...
Loading