Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async variant of get_online_features #3927

Closed
tokoko opened this issue Feb 3, 2024 · 5 comments · Fixed by #4172
Closed

Add async variant of get_online_features #3927

tokoko opened this issue Feb 3, 2024 · 5 comments · Fixed by #4172
Labels
kind/feature New feature or request

Comments

@tokoko
Copy link
Collaborator

tokoko commented Feb 3, 2024

Is your feature request related to a problem? Please describe.
Putting feast's python online store interface in production is a hard sell as it is, but the fact what there's no way to make it async makes it even harder. Using get_online_features with good online store implementation might be suitable for services with relatively low traffic, but there comes a point beyond which it's hard to scale.

Describe the solution you'd like
Add get_online_features_async method to FeatureStore class. The method will either use new online_read_async method from the configured online store or throw an exception if the online store doesn't implement async variant.

Describe alternatives you've considered
The alternative is to forgo this, treat python get_online_features as something that comes with practical limitations and focus on developing the alternatives in other languages, for example java feature server.

Additional context
I admit I haven't really done any benchmarking to compare the alternatives, the claims are mostly just common sense conclusions, but I'll gladly be corrected if others disagree.

@tokoko tokoko added the kind/feature New feature or request label Feb 3, 2024
@breno-costa
Copy link
Contributor

breno-costa commented Apr 9, 2024

I have developed a custom SDK implementation with an async get_online_features method. I've been using Redis as online store and also had to override the function def _get_client(self, online_store_config: RedisOnlineStoreConfig): since the redis.asyncio package is used to create redis client instead of sync one.

@shuchu
Copy link
Collaborator

shuchu commented Apr 9, 2024

Shot us a PR. :)

@shuchu shuchu self-assigned this Apr 24, 2024
@breno-costa
Copy link
Contributor

breno-costa commented Apr 24, 2024

@shuchu @tokoko sorry for late response. May I can raise a PR for this change. I'm analyzing changes based on suggestions above. The change is not that simple, as await requires changes to be propagated in different parts of the code. These are the changes I see that must be done:

class FeatureStore:

    # new async version of existing method
    async def get_online_features_async( 
        self,
        features: Union[List[str], FeatureService],
        entity_rows: List[Dict[str, Any]],
        full_feature_names: bool = False,
    ) -> OnlineResponse:

       ...

        return await self._get_online_features_async(
            features=features,
            entity_values=columnar,
            full_feature_names=full_feature_names,
            native_entity_values=True,
        )

    # new async version of existing method
    async def _get_online_features_async(
        self,
        features: Union[List[str], FeatureService],
        entity_values: Mapping[
            str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
        ],
        full_feature_names: bool = False,
        native_entity_values: bool = True,
    ):

        # do a bunch of things

        for table, requested_features in grouped_refs:

            ...

            # await async method call
            feature_data = await self._read_from_online_store(
                table_entity_values,
                provider,
                requested_features,
                table,
            )

       # do another bunch of things

    # new async version of existing method
    async def _read_from_online_store(
        self,
        entity_rows: Iterable[Mapping[str, Value]],
        provider: Provider,
        requested_features: List[str],
        table: FeatureView,
    ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:

        ...

        # await async method call
        read_rows = await provider.online_read(
            config=self.config,
            table=table,
            entity_keys=entity_key_protos,
            requested_features=requested_features,
        )

        ...

Change Provider:

class Provider(ABC):

    # new async version of existing method
    async def online_read_async(
        self,
        config: RepoConfig,
        table: FeatureView,
        entity_keys: List[EntityKeyProto],
        requested_features: Optional[List[str]] = None,
    ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
class PassthroughProvider(Provider):

    # new async version of existing method
    async def online_read_async(
        self,
        config: RepoConfig,
        table: FeatureView,
        entity_keys: List[EntityKeyProto],
        requested_features: Optional[List[str]] = None,
    ) -> List:
                
            # await async function call
            result = await self.online_store.online_read_async(
                config, table, entity_keys, requested_features
            )

Add new method to OnlineStore interface:

class OnlineStore:

    # new async version of existing method
    async def online_read_async(
        self,
        config: RepoConfig,
        table: FeatureView,
        entity_keys: List[EntityKeyProto],
        requested_features: Optional[List[str]] = None,
    ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:

Write specific implementations for online stores. Example for RedisOnlineStore that would have a async implementation:

class RedisOnlineStore(OnlineStore):

    # new async version of existing method
    async def online_read_async(
        self,
        config: RepoConfig,
        table: FeatureView,
        entity_keys: List[EntityKeyProto],
        requested_features: Optional[List[str]] = None,
    ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:

        ...

        client = self._get_client_async(online_store_config)

        # execute async redis commands

Implementation example for BigtableOnlineStore that wouldn't have a async implementation:

class BigtableOnlineStore(OnlineStore):

    async def online_read_async(
        self,
        config: RepoConfig,
        table: FeatureView,
        entity_keys: List[EntityKeyProto],
        requested_features: Optional[List[str]] = None,
    ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
        raise NotImplementedError("Async online read method is not implemented for this online store")

wdyt? Does it make sense or am I forgetting something?

@tokoko
Copy link
Collaborator Author

tokoko commented Apr 24, 2024

looks spot on to me. One thing about the changes to OnlineStore abstract class, maybe it can be added with a default implementation (that raises an exception) so only the online stores that implement async will have to be touched...

# do a bunch of things

Do you plan to do some refactoring here or copy-paste for now? I'm fine with either for starters.

@breno-costa
Copy link
Contributor

breno-costa commented Apr 24, 2024

looks spot on to me. One thing about the changes to OnlineStore abstract class, maybe it can be added with a default implementation (that raises an exception) so only the online stores that implement async will have to be touched...

makes sense.

Do you plan to do some refactoring here or copy-paste for now? I'm fine with either for starters.

I'll try to move some parts to separate functions so the code becomes more maintainable - any future changes will be reflected in both sync and async implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants