def generate_snowflake_ctx() -> snowflake.connector.SnowflakeConnection
Generate a Snowflake context from the settings
class WyvernService()
The class to define, generate and run a Wyvern service
Attributes:
host
- The host to run the service on. Defaults to localhost.port
- The port to run the service on. Defaults to 5000.
async def register_routes(
route_components: List[Type[APIRouteComponent]]) -> None
Register the routes for the Wyvern service
Arguments:
route_components
- The list of route components to register
Returns:
None
@staticmethod
def generate(*,
route_components: Optional[List[Type[APIRouteComponent]]] = None,
realtime_feature_components: Optional[List[
Type[RealtimeFeatureComponent]]] = None,
host: str = "127.0.0.1",
port: int = 5000) -> WyvernService
Generate a Wyvern service
Arguments:
route_components
- The list of route components to register. Defaults to None.realtime_feature_components
- The list of realtime feature components to register. Defaults to None.host
- The host to run the service on. Defaults to localhost.port
- The port to run the service on. Defaults to 5000.
Returns:
WyvernService
- The generated Wyvern service
@staticmethod
def run(*,
route_components: List[Type[APIRouteComponent]],
realtime_feature_components: Optional[List[
Type[RealtimeFeatureComponent]]] = None,
host: str = "127.0.0.1",
port: int = 5000)
Generate and run a Wyvern service
Arguments:
route_components
- The list of route components to registerrealtime_feature_components
- The list of realtime feature components to register. Defaults to None.host
- The host to run the service on. Defaults to localhost.port
- The port to run the service on. Defaults to 5000.
Returns:
None
@staticmethod
def generate_app(*,
route_components: Optional[List[
Type[APIRouteComponent]]] = None,
realtime_feature_components: Optional[List[
Type[RealtimeFeatureComponent]]] = None,
host: str = "127.0.0.1",
port: int = 5000) -> FastAPI
Generate a Wyvern service and return the FastAPI app
Arguments:
route_components
- The list of route components to register. Defaults to None.realtime_feature_components
- The list of realtime feature components to register. Defaults to None.host
str, optional - The host to run the service on. Defaults to localhost.port
int, optional - The port to run the service on. Defaults to 5000.
Returns:
FastAPI
- The generated FastAPI app
class Settings(BaseSettings)
Settings for the Wyvern service
Extends from BaseSettings class, allowing values to be overridden by environment variables. This is useful in production for secrets you do not wish to save in code
Attributes:
-
ENVIRONMENT
- The environment the service is running in. Default todevelopment
. -
PROJECT_NAME
- The name of the project. Default todefault
. -
REDIS_HOST
- The host of the redis instance. Default tolocalhost
. -
REDIS_PORT
- The port of the redis instance. Default to6379
. -
WYVERN_API_KEY
- The API key for the Wyvern API. Default to""
, empty string. -
WYVERN_BASE_URL
- The base url of the Wyvern API. Default tohttps://api.wyvern.ai
WYVERN_ONLINE_FEATURES_PATH: The path to the online features endpoint. Default to/feature/get-online-features
. WYVERN_HISTORICAL_FEATURES_PATH: The path to the historical features endpoint. Default to/feature/get-historical-features
. -
WYVERN_FEATURE_STORE_URL
- The url of the Wyvern feature store. Default tohttps://api.wyvern.ai
. -
SNOWFLAKE_ACCOUNT
- The account name of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_USER
- The username of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_PASSWORD
- The password of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_ROLE
- The role of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_WAREHOUSE
- The warehouse of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_DATABASE
- The database of the Snowflake instance. Default to""
, empty string. -
SNOWFLAKE_OFFLINE_STORE_SCHEMA
- The schema of the Snowflake instance. Default toPUBLIC
. -
AWS_ACCESS_KEY_ID
- The access key id for the AWS instance. Default to""
, empty string. -
AWS_SECRET_ACCESS_KEY
- The secret access key for the AWS instance. Default to""
, empty string. -
AWS_REGION_NAME
- The region name for the AWS instance. Default tous-east-1
. -
FEATURE_STORE_TIMEOUT
- The timeout for the feature store. Default to60
seconds. -
SERVER_TIMEOUT
- The timeout for the server. Default to60
seconds. -
REDIS_BATCH_SIZE
- The batch size for the redis instance. Default to100
. -
WYVERN_INDEX_VERSION
- The version of the Wyvern index. Default to1
. -
MODELBIT_BATCH_SIZE
- The batch size for the modelbit. Default to30
. -
EXPERIMENTATION_ENABLED
- Whether experimentation is enabled. Default toFalse
. -
EXPERIMENTATION_PROVIDER
- The experimentation provider. Default toExperimentationProvider.EPPO.value
. -
EPPO_API_KEY
- The API key for EPPO (an experimentation provider). Default to""
, empty string. -
FEATURE_STORE_ENABLED
- Whether the feature store is enabled. Default toTrue
. -
EVENT_LOGGING_ENABLED
- Whether event logging is enabled. Default toTrue
.
def wyvern_encode(data: Dict[str, Any]) -> bytes
encode a dict to compressed bytes using lz4.frame
def wyvern_decode(data: Union[bytes, str]) -> Dict[str, Any]
decode compressed bytes to a dict with lz4.frame
class AiohttpClientWrapper()
AiohttpClientWrapper is a singleton wrapper around aiohttp.ClientSession.
def start()
Instantiate the client. Call from the FastAPI startup hook.
async def stop()
Gracefully shutdown. Call from FastAPI shutdown hook.
def __call__()
Calling the instantiated AiohttpClientWrapper returns the wrapped singleton.
The aiohttp client singleton. Use this to make requests.
Example:
```python
from wyvern.core.http import aiohttp_client
aiohttp_client().get("https://www.wyvern.ai")
```
class ExperimentationEventData(BaseModel)
Data class for ExperimentationEvent.
Attributes:
experiment_id
- The experiment id.entity_id
- The entity id.result
- The result of the experiment. Can be None.timestamp
- The timestamp of the event.metadata
- The metadata of the event such as targeting parameters etc.has_error
- Whether the request has errored or not.
class ExperimentationEvent(LoggedEvent[ExperimentationEventData])
Event class for ExperimentationEvent.
Attributes:
event_type
- The event type. This is always EventType.EXPERIMENTATION.
class EppoExperimentationClient(BaseExperimentationProvider)
An experimentation client specifically for the Eppo platform.
Extends the BaseExperimentationProvider to provide functionality using the Eppo client.
Methods:
- init() -> None
- get_result(experiment_id: str, entity_id: str, **kwargs) -> str
- log_result(experiment_id: str, entity_id: str, variant: str) -> None
def get_result(experiment_id: str, entity_id: str, **kwargs) -> str
Fetches the variant for a given experiment and entity from the Eppo client.
Arguments:
- experiment_id (str): The unique ID of the experiment.
- entity_id (str): The unique ID of the entity (e.g., user or other subject).
- **kwargs: Additional arguments to be passed to the Eppo client's get_assignment method.
Returns:
- str: The assigned variant for the given experiment and entity.
def log_result(experiment_id: str,
entity_id: str,
variant: Optional[str] = None,
has_error: bool = False,
**kwargs) -> None
Logs the result for a given experiment and entity.
Arguments:
-
experiment_id (str): The unique ID of the experiment.
-
entity_id (str): The unique ID of the entity.
-
variant (str): The assigned variant for the given experiment and entity.
-
Note
- This method is overridden to do nothing because the assignment logger we set in Eppo already handles result logging upon assignment.
class ExperimentationProvider(str, Enum)
An enum for the experimentation providers.
class BaseExperimentationProvider(ABC)
A base class for experimentation providers. All providers should inherit from this and implement the necessary methods.
@abstractmethod
def get_result(experiment_id: str, entity_id: str, **kwargs) -> str
Get the result (variant) for a given experiment and entity.
Arguments:
- experiment_id (str): The unique ID of the experiment.
- entity_id (str): The unique ID of the entity.
- kwargs (dict): Any additional arguments to pass to the provider for targeting.
Returns:
- str: The result (variant) assigned to the entity for the specified experiment.
@abstractmethod
def log_result(experiment_id: str,
entity_id: str,
variant: Optional[str] = None,
has_error: bool = False,
**kwargs) -> None
Log the result (variant) for a given experiment and entity.
Arguments:
- experiment_id (str): The unique ID of the experiment.
- entity_id (str): The unique ID of the entity.
- variant (str): The result (variant) assigned to the entity for the specified experiment.
- kwargs (dict): Any additional arguments to pass to the provider for targeting.
Returns:
- None
class ExperimentationClient()
A client for interacting with experimentation providers.
def __init__(provider_name: str, api_key: Optional[str] = None)
Initializes the ExperimentationClient with a specified provider.
Arguments:
- provider_name (str): The name of the experimentation provider (e.g., "eppo").
def get_experiment_result(experiment_id: str, entity_id: str,
**kwargs) -> Optional[str]
Get the result (variant) for a given experiment and entity using the chosen provider.
Arguments:
- experiment_id (str): The unique ID of the experiment.
- entity_id (str): The unique ID of the entity.
- kwargs (dict): Any additional arguments to pass to the provider for targeting.
Returns:
- str: The result (variant) assigned to the entity for the specified experiment.
def current() -> Optional[WyvernRequest]
Get the current request context
Returns:
The current request context, or None if there is none
def ensure_current_request() -> WyvernRequest
Get the current request context, or raise an error if there is none
Returns:
The current request context if there is one
Raises:
RuntimeError
- If there is no current request context
def set(request: WyvernRequest) -> None
Set the current request context
Arguments:
request
- The request context to set
Returns:
None
def reset() -> None
Reset the current request context
Returns:
None
A WyvernFeature defines the type of a feature in Wyvern. It can be a float, a string, a list of floats, or None.
@app.command()
def init(project: str = typer.Argument(...,
help="Name of the project")) -> None
Initializes Wyvern application template code
Arguments:
project
str - Name of the project
@app.command()
def run(
path: str = "pipelines.main:app",
host: Annotated[
str,
typer.Option(help="Host to run the application on"),
] = "0.0.0.0",
port: Annotated[
int,
typer.Option(
help="Port to run the application on. Default port is 5001"),
] = 5001
) -> None
Starts Wyvern application server
Example usage: wyvern run --path pipelines.main:app --host 0.0.0.0 --port 5001
Arguments:
path
str - path to the wyvern app. Default path is pipelines.main:apphost
str - Host to run the application on. Default host is 0.0.0.0port
int - Port to run the application on. Default port is 5001
@app.command()
def redis() -> None
Starts Redis server. This command will also install redis locally if it's not installed.
def ensure_async_client(func: Callable) -> Callable
Ensure that the async client is open before calling the function and close it after calling the function
Arguments:
func
- The function to be wrapped
Returns:
The wrapped function
class WyvernAPI()
Wyvern API client
@ensure_async_client
def get_historical_features(
features: List[str], entities: Union[Dict[Hashable, List[Any]],
pd.DataFrame]) -> pd.DataFrame
Aggregate all the historical features, including the offline features in your data warehouse and the historical real-time features being consumed by wyvern pipeline.
Arguments:
features
- A list of feature names.entities
- A dictionary or pandas DataFrame of entity names and their values. some requirements of entities:- entities must have request and timestamp keys
- request is a list of the request_id of request getting into Wyvern's pipeline
- timestamp is a list of timestamp of the request
- the rest of the columns are the entity for the features and the user interaction data
Returns:
A pandas DataFrame with all the feature data you're requesting from the entities.
class CandidateEventData(EntityEventData)
Event data for a candidate event
Attributes:
candidate_score
- The score of the candidatecandidate_order
- The order of the candidate in the list of candidates
class PaginationRequest(GenericModel, Generic[T])
This is the input to the PaginationComponent.
Attributes:
pagination_fields
- The pagination fields that are used to compute the pagination.entities
- The entities that need to be paginated.
class PaginationComponent(Component[PaginationRequest[T], List[T]])
This component is used to paginate the entities. It takes in the pagination fields and the entities and returns the paginated entities.
async def execute(input: PaginationRequest[T], **kwargs) -> List[T]
This method paginates the entities based on the pagination fields.
Validations:
- The ranking page should be greater than or equal to 0.
- The candidate page should be greater than or equal to 0.
- The candidate page size should be less than or equal to 1000.
- The number of entities should be less than or equal to 1000.
- The user page size should be less than or equal to 100.
- The user page size should be less than or equal to the candidate page size.
- The end index should be less than the number of entities.
- The end index should be greater than the start index.
Returns:
The paginated entities.
class PaginationFields(BaseModel)
Pagination fields for requests. This is a mixin class that can be used in any request that requires pagination.
Attributes:
user_page_size
- Zero-indexed user facing page numberuser_page
- Number of items per user facing pagecandidate_page_size
- This is the size of the candidate page.candidate_page
- This is the zero-indexed page number for the candidate set
The primary entity is the entity that is the main entity for the feature. For example, if we are computing the feature for a user, the primary entity would be the user.
The secondary entity is the entity that is the secondary entity for the feature. For example, if we are computing the feature for a user and a product, the secondary entity would be the product. If we are computing the feature for a user, the secondary entity would be None.
class RealtimeFeatureRequest(GenericModel, Generic[REQUEST_ENTITY])
This is the request that is passed into the realtime feature component.
class RealtimeFeatureEntity(GenericModel, Generic[PRIMARY_ENTITY,
SECONDARY_ENTITY])
This is the entity that is passed into the realtime feature component. It contains the primary entity and the secondary entity. If the feature is only for the primary entity, the secondary entity will be None.
class RealtimeFeatureComponent(Component[
Tuple[
RealtimeFeatureRequest[REQUEST_ENTITY],
RealtimeFeatureEntity[PRIMARY_ENTITY, SECONDARY_ENTITY],
],
Optional[FeatureData],
], Generic[PRIMARY_ENTITY, SECONDARY_ENTITY, REQUEST_ENTITY])
This is the base class for all realtime feature components. It contains the logic for computing the realtime feature. The realtime feature component can be used to compute features for a single entity, two entities, or a request. The realtime feature component can also be used to compute composite features for two entities.
The realtime feature component is a generic class that takes in the primary entity, secondary entity, and request entity as type parameters. The primary entity is the entity that is the main entity for the feature. For example, if we are computing the feature for a user, the primary entity would be the user. The secondary entity is the entity that is the secondary entity for the feature. For example, if we are computing the feature for a user and a product, the secondary entity would be the product. If we are computing the feature for a user, the secondary entity would be None. The request entity is the request that is passed into the realtime feature component. We can use the request entity to compute features for a request. For example, if we are computing the realtime features for a ranking request, the request entity would be the ranking request. We can combine the primary entity, secondary entity, and request entity to compute composite features.
Attributes:
NAME
- The name of the realtime feature component. This is used to identify the realtime feature component.real_time_features
- A list of all the realtime feature components.component_registry
- A dictionary that maps the name of the realtime feature component to the realtime feature
def __init__(*upstreams: Component,
output_feature_names: Optional[Set[str]] = None,
required_feature_names: Optional[Set[str]] = None,
name: Optional[str] = None)
Arguments:
name
: Name of the componentoutput_feature_names
: features outputted by this real-time feature
@classmethod
def get_type_args_simple(cls, index: int) -> Type
Get the type argument at the given index for the class. This is used to get the primary entity type, secondary entity type, and request entity type.
@classmethod
def get_entity_names(cls, full_feature_name: str) -> Optional[List[str]]
Get the entity identifier type, which will be used as sql column name
full_feature_name is of the form <component_name>:<feature_name>
@classmethod
def get_entity_type_column(cls, full_feature_name: str) -> Optional[str]
Get the entity identifier type, which will be used as sql column name
full_feature_name is of the form <component_name>:<feature_name>
def can_execute_on(request: REQUEST_ENTITY,
primary_entity: Optional[PRIMARY_ENTITY],
secondary_entity: Optional[SECONDARY_ENTITY]) -> bool
Checks if the input matches the entity type, so we can execute on it
def set_full_feature_name(
feature_data: Optional[FeatureData]) -> Optional[FeatureData]
Sets the full feature name for the feature data
class FeatureStoreRetrievalRequest(BaseModel)
Request to retrieve features from the feature store.
Attributes:
identifiers
- List of identifiers for which features are to be retrieved.feature_names
- List of feature names to be retrieved. Feature names are of the form<feature_view_name>:<feature_name>
.
class FeatureStoreRetrievalComponent(Component[FeatureStoreRetrievalRequest,
FeatureMap])
Component to retrieve features from the feature store. This component is responsible for fetching features from
the feature store and returning them in the form of a FeatureMap. The FeatureMap is a mapping from identifiers to
FeatureData. The FeatureData contains the identifier and a mapping from feature names to feature values. The
feature names are of the form <feature_view_name>:<feature_name>
. The feature values are of type WyvernFeature
which is a union of all the possible feature types. The feature types are defined in wyvern/wyvern_typing.py
.
The FeatureStoreRetrievalComponent is a singleton and can be accessed via feature_store_retrieval_component
.
The FeatureStoreRetrievalComponent is configured via the following environment variables:
- WYVERN_API_KEY: if you're using Wyvern's feature store, this is the API key for Wyvern
- WYVERN_FEATURE_STORE_URL: url to the feature store
- WYVERN_ONLINE_FEATURES_PATH: url path to the feature store's online features endpoint
- FEATURE_STORE_ENABLED: whether the feature store is enabled or not
async def fetch_features_from_feature_store(
identifiers: List[Identifier], feature_names: List[str]) -> FeatureMap
Fetches features from the feature store for the given identifiers and feature names.
Arguments:
identifiers
- List of identifiers for which features are to be retrieved.feature_names
- List of feature names to be retrieved.
Returns:
FeatureMap containing the features for the given identifiers and feature names.
@tracer.wrap(name="FeatureStoreRetrievalComponent.execute")
async def execute(input: FeatureStoreRetrievalRequest,
handle_exceptions: bool = False,
**kwargs) -> FeatureMap
Fetches features from the feature store for the given identifiers and feature names. This method is a wrapper
around fetch_features_from_feature_store
which handles exceptions and returns an empty FeatureMap in case of
an exception.
class FeatureRetrievalPipelineRequest(GenericModel, Generic[REQUEST_ENTITY])
This is the input to the FeatureRetrievalPipeline component that is used to retrieve features.
Attributes:
request
- The request that is used to retrieve features. This is used to retrieve the entities and identifiers that are needed to compute the features.requested_feature_names
- The feature names that are requested. This is used to filter out the real-time features that are calculated instead of retrieved from the feature store. ie:product_fv:FEATURE_PRODUCT_AMOUNT_PAID_LAST_15_DAYS
feature_overrides
- This is used to override the default real-time features.
class FeatureRetrievalPipeline(
Component[FeatureRetrievalPipelineRequest[REQUEST_ENTITY],
FeatureMap], Generic[REQUEST_ENTITY])
This component is used to retrieve features for a given request. It is composed of the following components: 1. FeatureStoreRetrievalComponent: This component is used to retrieve features from the feature store. 2. RealtimeFeatureComponent: This component is used to compute real-time features. 3. FeatureEventLoggingComponent: This component is used to log feature events.
def __init__(*upstreams: Component,
name: str,
handle_exceptions: bool = False)
Arguments:
*upstreams
- The upstream components to this component.name
- The name of this component.handle_exceptions
- Whether to handle feature store exceptions. Defaults to False. If True, missing feature values will be None instead of raising exceptions. If False, exceptions will be raised.
@tracer.wrap(name="FeatureRetrievalPipeline.execute")
async def execute(input: FeatureRetrievalPipelineRequest[REQUEST_ENTITY],
**kwargs) -> FeatureMap
This method is used to retrieve features for a given request.
It is composed of the following steps: 0. Figure out which features are real-time features and which features are feature store features. 1. Retrieve features from the feature store. 2. Compute real-time features. 3. Combine the feature store features and real-time features into one FeatureMap. 4. Log the feature values to the feature event logging component.
class FeatureLogEventData(BaseModel)
Data for a feature event.
Attributes:
feature_identifier
- The identifier of the feature.feature_identifier_type
- The type of the feature identifier.feature_name
- The name of the feature.feature_value
- The value of the feature.
class FeatureEvent(LoggedEvent[FeatureLogEventData])
A feature event.
Attributes:
event_type
- The type of the event. Defaults to EventType.FEATURE.
class FeatureEventLoggingRequest(GenericModel, Generic[REQUEST_ENTITY])
A request to log feature events.
Attributes:
request
- The request to log feature events for.feature_map
- The feature map to log.
class FeatureEventLoggingComponent(
Component[FeatureEventLoggingRequest[REQUEST_ENTITY],
None], Generic[REQUEST_ENTITY])
A component that logs feature events.
async def execute(input: FeatureEventLoggingRequest[REQUEST_ENTITY],
**kwargs) -> None
Logs feature events.
MODEL_OUTPUT_DATA_TYPE is the type of the output of the model. It can be a float, a string, or a list of floats (e.g. a list of probabilities, embeddings, etc.)
class ModelEventData(BaseModel)
This class defines the data that will be logged for each model event.
Arguments:
model_name
- The name of the modelmodel_output
- The output of the modelentity_identifier
- The identifier of the entity that was used to generate the model output. This is optional.entity_identifier_type
- The type of the identifier of the entity that was used to generate the model output. This is optional.
class ModelEvent(LoggedEvent[ModelEventData])
Model event. This is the event that is logged when a model is evaluated.
Arguments:
event_type
- The type of the event. This is always EventType.MODEL.
class ModelOutput(GenericModel, Generic[MODEL_OUTPUT_DATA_TYPE])
This class defines the output of a model.
Arguments:
data
- A dictionary mapping entity identifiers to model outputs. The model outputs can also be None.model_name
- The name of the model. This is optional.
def get_entity_output(
identifier: Identifier) -> Optional[MODEL_OUTPUT_DATA_TYPE]
Get the model output for a given entity identifier.
Arguments:
identifier
- The identifier of the entity.
Returns:
The model output for the given entity identifier. This can also be None if the model output is None.
class ModelInput(GenericModel, Generic[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY])
This class defines the input to a model.
Arguments:
request
- The request that will be used to generate the model input.entities
- A list of entities that will be used to generate the model input.
@property
def first_entity() -> GENERALIZED_WYVERN_ENTITY
Get the first entity in the list of entities. This is useful when you know that there is only one entity.
Returns:
The first entity in the list of entities.
@property
def first_identifier() -> Identifier
Get the identifier of the first entity in the list of entities. This is useful when you know that there is only one entity.
Returns:
The identifier of the first entity in the list of entities.
class ModelComponent(Component[
MODEL_INPUT,
MODEL_OUTPUT,
])
This class defines a model component. A model component is a component that takes in a request and a list of entities and outputs a model output. The model output is a dictionary mapping entity identifiers to model outputs. The model outputs can also be None if the model output is None for a given entity.
@classmethod
def get_type_args_simple(cls, index: int) -> Type
Get the type argument at the given index. This is used to get the model input and model output types.
@cached_property
def manifest_feature_names() -> Set[str]
This function defines which features are necessary for model evaluation
Our system will automatically fetch the required features from the feature store to make this model evaluation possible
async def execute(input: MODEL_INPUT, **kwargs) -> MODEL_OUTPUT
The model_name and model_score will be automatically logged
async def batch_inference(
request: BaseWyvernRequest, entities: List[Union[WyvernEntity,
BaseWyvernRequest]]
) -> Sequence[Optional[Union[float, str, List[float]]]]
Define your model inference in a batched manner so that it's easier to boost inference speed
async def inference(input: MODEL_INPUT, **kwargs) -> MODEL_OUTPUT
The inference function is the main entrance to model evaluation.
By default, the base ModelComponent slices entities into smaller batches and call batch_inference on each batch.
The default batch size is 30. You should be able to configure the MODEL_BATCH_SIZE env variable to change the batch size.
In order to set up model inference, you only need to define a class that inherits ModelComponent and implement batch_inference.
You can also override this function if you want to customize the inference logic.
class ModelbitComponent(ModelComponent[MODEL_INPUT, MODEL_OUTPUT])
ModelbitComponent is a base class for all modelbit model components. It provides a common interface to implement all modelbit models.
ModelbitComponent is a subclass of ModelComponent.
Attributes:
AUTH_TOKEN
- A class variable that stores the auth token for Modelbit.URL
- A class variable that stores the url for Modelbit.
def __init__(*upstreams,
name: Optional[str] = None,
auth_token: Optional[str] = None,
url: Optional[str] = None) -> None
Arguments:
*upstreams
- A list of upstream components.name
- A string that represents the name of the model.auth_token
- A string that represents the auth token for Modelbit.url
- A string that represents the url for Modelbit.
Raises:
WyvernModelbitTokenMissingError
- If the auth token is not provided.
@cached_property
def modelbit_features() -> List[str]
This is a cached property that returns a list of modelbit features. This method should be implemented by the subclass.
@cached_property
def manifest_feature_names() -> Set[str]
This is a cached property that returns a set of manifest feature names. This method wraps around the modelbit_features property.
async def build_requests(
input: MODEL_INPUT) -> Tuple[List[Identifier], List[Any]]
Please refer to modlebit batch inference API: https://doc.modelbit.com/deployments/rest-api/
async def inference(input: MODEL_INPUT, **kwargs) -> MODEL_OUTPUT
This method sends a request to Modelbit and returns the output.
class APIRouteComponent(Component[REQUEST_SCHEMA, RESPONSE_SCHEMA])
APIRouteComponent is the base class for all the API routes in Wyvern. It is a Component that takes in a request schema and a response schema, and it is responsible for hydrating the request data with Wyvern Index data, and then pass the hydrated data to the next component in the pipeline.
The APIRouteComponent is also responsible for the API routing, which means it is responsible for the API versioning and the API path.
Attributes:
API_VERSION
- the version of the API. This is used in the API routing. The default value is "v1".PATH
- the path of the API. This is used in the API routing.REQUEST_SCHEMA_CLASS
- the class of the request schema. This is used to validate the request data.RESPONSE_SCHEMA_CLASS
- the class of the response schema. This is used to validate the response data.API_NAME
- the name of the API. This is used in the API routing. If not provided, the name of the APIRouteComponent will be used.
async def warm_up(input: REQUEST_SCHEMA) -> None
This is the warm-up function that is called before the API route is called.
@tracer.wrap(name="APIRouteComponent.hydrate")
async def hydrate(input: REQUEST_SCHEMA) -> None
Wyvern APIRouteComponent recursively hydrate the request input data with Wyvern Index data
TODO: this function could be moved to a global place
class BoostingBusinessLogicComponent(
BusinessLogicComponent[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
A component that performs boosting on an entity with a set of candidates. The boosting can be multiplicative or additive.
The request itself could contain more than just entities, for example it may contain a query and so on
def boost(
scored_candidates: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
entity_keys: Set[str],
boost: float,
entity_key_mapping: Callable[
[GENERALIZED_WYVERN_ENTITY],
str,
] = lambda candidate: candidate.identifier.identifier,
multiplicative=False
) -> List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]
Boosts the score of each candidate by a certain factor
Arguments:
scored_candidates
- The list of scored candidatesentity_keys
- The set of entity keys (unique identifiers) to boostboost
- The boost factorentity_key_mapping
- A lambda function that takes in a candidate entity and returns the field we should apply the boost tomultiplicative
- Whether to apply the boost with multiplication or addition - true indicates it is multiplication and false indicates it is addition
Returns:
The list of scored candidates with the boost applied
class CSVBoostingBusinessLogicComponent(
BoostingBusinessLogicComponent[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
This component reads a csv file and applies the boost based on specific column name, entity key, and score combinations
Methods to define: Given a CSV row, generate the entity key and boost value
Arguments:
csv_file
- The path to the CSV filemultiplicative
- Whether to apply the boost with multiplication or addition - true indicates it is multiplication and false indicates it is addition
async def initialize() -> None
Reads the CSV file and populates the lookup table
@abstractmethod
async def extract_keys_from_csv_row(row: Series) -> str
Given a CSV row, generate the unique combinations that would apply a boost
Example, in a file that has the following: product_id, query, boost
The method would return a unique concatenation (ie product_id:query)
@abstractmethod
async def extract_boost_value_from_csv_row(row: Series) -> float
Given a CSV row, generate the unique combinations that would apply a boost
Example, in a file that has the following: product_id, query, boost
The method would return the boost value
@abstractmethod
async def extract_key_from_request_entity(candidate: GENERALIZED_WYVERN_ENTITY,
request: REQUEST_ENTITY) -> str
Given a candidate and a request, generate a unique key that would apply a boost
async def execute(
input: BusinessLogicRequest[
GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY,
], **kwargs) -> List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]
Boosts the score of each candidate by a certain factor
class BusinessLogicEventData(EntityEventData)
The data associated with a business logic event
Arguments:
business_logic_pipeline_order
- The order of the business logic pipeline that this event occurred inbusiness_logic_name
- The name of the business logic component that this event occurred inold_score
- The old score of the entitynew_score
- The new score of the entity
class BusinessLogicEvent(LoggedEvent[BusinessLogicEventData])
An event that occurs in the business logic layer
class BusinessLogicRequest(GenericModel, Generic[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY])
A request to the business logic layer to perform business logic on a set of candidates
Arguments:
request
- The request that the business logic layer is being asked to perform business logic onscored_candidates
- The candidates that the business logic layer is being asked to perform business logic on
class BusinessLogicResponse(GenericModel, Generic[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY])
The response from the business logic layer after performing business logic on a set of candidates
Arguments:
request
- The request that the business logic layer was asked to perform business logic onadjusted_candidates
- The candidates that the business logic layer performed business logic on
class BusinessLogicComponent(Component[
BusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
], Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
A component that performs business logic on an entity with a set of candidates
The request itself could contain more than just entities, for example it may contain a query and so on
class BusinessLogicPipeline(Component[
BusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
BusinessLogicResponse[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
], Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
Steps through a series of business logic components and returns the final output
This operation is fully chained, meaning that the output of each business logic component is passed as an input to the next business logic component
@tracer.wrap(name="BusinessLogicPipeline.execute")
async def execute(
input: BusinessLogicRequest[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY], **kwargs
) -> BusinessLogicResponse[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY]
Executes the business logic pipeline on the inputted candidates
Arguments:
input
- The input to the business logic pipeline
Returns:
The output of the business logic pipeline
def extract_business_logic_events(
output: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
pipeline_index: int, upstream_name: str, request_id: str,
old_scores: List[float]) -> List[BusinessLogicEvent]
Extracts the business logic events from the output of a business logic component
Arguments:
output
- The output of a business logic componentpipeline_index
- The index of the business logic component in the business logic pipelineupstream_name
- The name of the business logic componentrequest_id
- The request id of the request that the business logic component was called inold_scores
- The old scores of the candidates that the business logic component was called on
Returns:
The business logic events that were extracted from the output of the business logic component
class PinningBusinessLogicComponent(
BusinessLogicComponent[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
A component that performs boosting on an entity with a set of candidates
The request itself could contain more than just entities, for example it may contain a query and so on
def pin(
scored_candidates: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
entity_pins: Dict[str, int],
entity_key_mapping: Callable[
[GENERALIZED_WYVERN_ENTITY],
str,
] = lambda candidate: candidate.identifier.identifier,
allow_down_ranking: bool = False
) -> List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]
Pins the supplied entity to the specific position
Arguments:
scored_candidates
- The list of scored candidatesentity_pins
- The map of entity keys (unique identifiers) to pin, and their pinning positionentity_key_mapping
- A lambda function that takes in a candidate entity and returns the field we should apply the pin toallow_down_ranking
- Whether to allow down-ranking of candidates that are not pinned
Returns:
The list of scored candidates with the pinned entities
class RankingRequest(BaseWyvernRequest, PaginationFields,
Generic[WYVERN_ENTITY])
This is the request for the ranking pipeline.
Attributes:
query
- the query entitycandidates
- the list of candidate entities
class ResponseCandidate(BaseModel)
This is the response candidate.
Attributes:
candidate_id
- the identifier of the candidateranked_score
- the ranked score of the candidate
class RankingResponse(BaseModel)
This is the response for the ranking pipeline.
Attributes:
ranked_candidates
- the list of ranked candidatesevents
- the list of logged events
class RankingPipeline(PipelineComponent[RankingRequest, RankingResponse],
Generic[WYVERN_ENTITY])
This is the ranking pipeline.
Attributes:
PATH
- the path of the API. This is used in the API routing. The default value is "/ranking".
def get_model() -> ModelComponent
This is the ranking model.
The model input should be a subclass of ModelInput. Its output should be scored candidates
def get_business_logic() -> Optional[BusinessLogicPipeline]
This is the business logic pipeline. It is optional. If not provided, the ranking pipeline will not apply any business logic.
The business logic pipeline should be a subclass of BusinessLogicPipeline. Some examples of business logic for ranking pipeline are:
- Deduplication
- Filtering
- (De)boosting
async def rank_candidates(
request: RankingRequest[WYVERN_ENTITY]
) -> List[ScoredCandidate[WYVERN_ENTITY]]
This function ranks the candidates.
- It first calls the ranking model to get the model scores for the candidates.
- It then calls the business logic pipeline to adjust the model scores.
- It returns the adjusted candidates.
Arguments:
request
- the ranking request
Returns:
A list of ScoredCandidate
class IndexUploadComponent(APIRouteComponent[IndexRequest, IndexResponse])
async def execute(input: IndexRequest, **kwargs) -> IndexResponse
bulk index entities with redis pipeline
class ImpressionEventData(EntityEventData)
Impression event data. This is the data that is logged for each impression.
Arguments:
impression_score
- The score of the impression.impression_order
- The order of the impression.
class ImpressionEvent(LoggedEvent[ImpressionEventData])
Impression event. This is the event that is logged for each impression.
Arguments:
event_type
- The type of the event. This is always EventType.IMPRESSION.
class ImpressionEventLoggingRequest(GenericModel,
Generic[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY])
Impression event logging request.
Arguments:
request
- The request that was made.scored_impressions
- The scored impressions. This is a list of scored candidates. Each scored candidate has an entity and a score.
class ImpressionEventLoggingComponent(Component[
ImpressionEventLoggingRequest[GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY],
None,
], Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY])
Impression event logging component. This component logs impression events.
@tracer.wrap(name="ImpressionEventLoggingComponent.execute")
async def execute(input: ImpressionEventLoggingRequest[
GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY], **kwargs) -> None
Logs impression events.
Arguments:
input
- The input to the component. This contains the request and the scored impressions.
Returns:
None
class EventType(str, Enum)
Enum for the different types of events that can be logged.
class LoggedEvent(GenericModel, Generic[EVENT_DATA])
Base class for all logged events.
Attributes:
request_id
- The request ID of the request that triggered the event.api_source
- The API source of the request that triggered the event.event_timestamp
- The timestamp of the event.event_type
- The type of the event.event_data
- The data associated with the event. This is a generic type that can be any subclass of BaseModel.
class EntityEventData(BaseModel)
Base class for all entity event data.
Attributes:
entity_identifier
- The identifier of the entity that the event is associated with.entity_identifier_type
- The type of the entity identifier.
class CustomEvent(LoggedEvent[ENTITY_EVENT_DATA_TYPE])
Class for custom events. Custom event data must be a subclass of EntityEventData.
Attributes:
event_type
- The type of the event. This is always EventType.CUSTOM.
class ComponentStatus(str, Enum)
This enum defines the status of the component.
class Component(Generic[INPUT_TYPE, OUTPUT_TYPE])
Component is the base class for all the components in Wyvern. It is a generic class that takes in the input type and the output type of the component.
It is responsible for: 1. Initializing the component 2. Initializing the upstream components
async def initialize() -> None
This is the place where you can do some initialization work for your component
As an example, you can initialize a model here or load a file, which is needed for your component to work
async def initialize_wrapper() -> None
Extend this method if your component has some work that needs to be done on server startup
This is a great place to initialize libraries to access external libraries, warm up models, etc
This runs after all objects have been constructed
async def execute(input: INPUT_TYPE, **kwargs) -> OUTPUT_TYPE
The actual meat of the component. Custom component has to implement
If your component has to complex input data structure, make sure to override this method in order to construct your input data with upstream components' output data
upstream_outputs contains data that was parsed by upstreams
@cached_property
def manifest_feature_names() -> Set[str]
This function defines which features are required for this component to work
Our system will automatically fetch the required features from the feature store to make this model evaluation possible
def get_feature(identifier: Identifier, feature_name: str) -> WyvernFeature
This function gets the feature value for the given identifier The features are cached once fetched/evaluated.
The feature that lives in the feature store should be just using the feature name without the "feature_view:" prefix For example, if your you have a feature view "fv" and a feature "wyvern_feature", then you would have defined "fv:wyvern_feature" in manifest_feature_names. However, when you fetch the feature value with this function, you just have to pass in feature_name="wyvern_feature".
def get_all_features(identifier: Identifier) -> Dict[str, WyvernFeature]
This function gets all features for the given identifier The features are cached once fetched/evaluated.
class PipelineComponent(APIRouteComponent[REQUEST_ENTITY, RESPONSE_SCHEMA])
PipelineComponent is the base class for all the pipeline components in Wyvern. It is a Component that takes in a request entity and a response schema, and it is responsible for hydrating the request data with Wyvern Index data, and then pass the hydrated data to the next component in the pipeline.
@cached_property
def realtime_features_overrides() -> Set[Type[RealtimeFeatureComponent]]
This function defines the set of RealtimeFeatureComponents that generates features with non-deterministic feature names. For example, feature names like matchedquery_brand. That feature is defined like matched_query{input.query.matched_query}, so it can refer to 10 or 20 features
async def retrieve_features(request: REQUEST_ENTITY) -> None
TODO shu: it doesn't support feature overrides. Write code to support that
class CosineSimilarityComponent(Component[List[Tuple[List[float],
List[float]]],
List[float]])
A component that computes cosine similarity in parallel for all pairs of embeddings.
async def execute(input: List[Tuple[List[float], List[float]]],
**kwargs) -> List[float]
Computes cosine similarity in parallel for all pairs of embeddings.
Arguments:
input
- List of tuples of embeddings to compute cosine similarity for.
Returns:
List of cosine similarities.
async def cosine_similarity(embedding_1: List[float],
embedding_2: List[float]) -> float
Computes cosine similarity between two embeddings.
class SortingComponent(Component[
List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
])
Sorts a list of candidates based on a score.
async def execute(
input: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
descending=True,
**kwargs) -> List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]
Sorts a list of candidates based on a score.
Arguments:
input
- A list of candidates to be sorted. Each candidate must have a score.descending
- Whether to sort in descending order. Defaults to True.
Returns:
A sorted list of candidates.
class WyvernRedis()
WyvernRedis is a wrapper for redis client to help index your entities in redis with Wyvern's convention
def __init__(scope: str = "",
redis_host: Optional[str] = None,
redis_port: Optional[int] = None) -> None
scope is used to prefix the redis key. You can use the environment variable PROJECT_NAME to set the scope.
async def get_entity(entity_type: str,
entity_id: str) -> Optional[Dict[str, Any]]
get entity from redis
async def get_entities(
entity_type: str,
entity_ids: Sequence[str]) -> List[Optional[Dict[str, Any]]]
get entity from redis
async def delete_entity(entity_type: str, entity_id: str) -> None
delete entity from redis
async def delete_entities(entity_type: str, entity_ids: Sequence[str]) -> None
delete entities from redis
def setup_tracing()
Setup tracing for Wyvern service. Tracing is disabled in development mode and for healthcheck requests.
def setup_logging()
Setup logging configuration by loading from log_config.yml file. Logs an error if the file cannot be found or loaded and uses default logging configuration.
class WyvernError(Exception)
Base class for all Wyvern errors.
Attributes:
message
- The error message.error_code
- The error code.
class WyvernEntityValidationError(WyvernError)
Raised when entity data is invalid
class PaginationError(WyvernError)
Raised when there is an error in pagination
class WyvernRouteRegistrationError(WyvernError)
Raised when there is an error in registering a route
class ComponentAlreadyDefinedInPipelineComponentError(WyvernError)
Raised when a component is already defined in a pipeline component
class WyvernFeatureStoreError(WyvernError)
Raised when there is an error in feature store
class WyvernFeatureNameError(WyvernError)
Raised when there is an error in feature name
class WyvernModelInputError(WyvernError)
Raised when there is an error in model input
class WyvernModelbitTokenMissingError(WyvernError)
Raised when modelbit token is missing
class WyvernModelbitValidationError(WyvernError)
Raised when modelbit validation fails
class WyvernAPIKeyMissingError(WyvernError)
Raised when api key is missing
class ExperimentationProviderNotSupportedError(WyvernError)
Raised when experimentation provider is not supported
class ExperimentationClientInitializationError(WyvernError)
Raised when experimentation client initialization fails
class KinesisFirehoseStream(str, Enum)
Enum for Kinesis Firehose stream names
Usage:
>>> KinesisFirehoseStream.EVENT_STREAM.get_stream_name()
def get_stream_name(customer_specific: bool = True,
env_specific: bool = True) -> str
Returns the stream name for the given stream
Arguments:
customer_specific
- Whether the stream name should be customer specificenv_specific
- Whether the stream name should be environment specific
Returns:
The stream name
class WyvernKinesisFirehose()
Wrapper around boto3 Kinesis Firehose client
def put_record_batch_callable(
stream_name: KinesisFirehoseStream,
record_generator: List[Callable[[], List[BaseModel]]])
Puts records to the given stream. This is a callable that can be used with FastAPI's BackgroundTasks. This way events can be logged asynchronously after the response is sent to the client.
Arguments:
stream_name
KinesisFirehoseStream - The stream to put records torecord_generator
List[Callable[[], List[BaseModel]]] - A list of functions that return a list of records
Returns:
None
def put_record_batch(stream_name: KinesisFirehoseStream,
records: List[BaseModel])
Puts records to the given stream
Arguments:
stream_name
KinesisFirehoseStream - The stream to put records torecords
List[BaseModel] - A list of records
Returns:
None
def log_events(event_generator: Callable[[], List[LoggedEvent]])
Logs events to the current request context.
Arguments:
event_generator
- A function that returns a list of events to be logged.
def get_logged_events() -> List[LoggedEvent[Any]]
Returns:
A list of all the events logged in the current request context.
def get_logged_events_generator(
) -> List[Callable[[], List[LoggedEvent[Any]]]]
Returns:
A list of all the event generators logged in the current request context.
def log_custom_events(events: List[ENTITY_EVENT_DATA_TYPE]) -> None
Logs custom events to the current request context.
Arguments:
events
- A list of custom events to be logged.
@asynccontextmanager
async def lifespan(app: FastAPI)
A context manager that starts and stops with the app. This is used to start and stop the aiohttp client.
class WyvernFastapi()
A wrapper around FastAPI that provides a few additional features:
- A healthcheck endpoint
- A request middleware that logs the request and response payloads
- A request middleware that sets the WyvernRequest in the request context
- Auto registration of routes from APIRouteComponent subclasses
endpoint input: the built WyvernPipeline the request input schema the request output schema
async def register_route(route_component: Type[APIRouteComponent]) -> None
Register a route component. This will register the route with FastAPI and also initialize the route component.
Arguments:
route_component
- The route component to register.
Raises:
WyvernRouteRegistrationError
- If the route component is not a subclass of APIRouteComponent.
@dataclass
class WyvernRequest()
WyvernRequest is a dataclass that represents a request to the Wyvern service. It is used to pass information between the various components of the Wyvern service.
Attributes:
method
- The HTTP method of the requesturl
- The full URL of the requesturl_path
- The path of the URL of the requestjson
- The JSON body of the request, represented by pydantic modelheaders
- The headers of the requestentity_store
- A dictionary that can be used to store entities that are created during the requestevents
- A list of functions that return a list of LoggedEvents. These functions are called at the end of the request to log events to the event storefeature_map
- A FeatureMap that can be used to store features that are created during the requestrequest_id
- The request ID of the request
@classmethod
def parse_fastapi_request(cls,
json: BaseModel,
req: fastapi.Request,
request_id: Optional[str] = None) -> WyvernRequest
Parses a FastAPI request into a WyvernRequest
Arguments:
json
- The JSON body of the request, represented by pydantic modelreq
- The FastAPI requestrequest_id
- The request ID of the request
Returns:
A WyvernRequest
class GetOnlineFeaturesRequest(BaseModel)
Request object for getting online features.
Attributes:
entities
- A dictionary of entity name to entity value.features
- A list of feature names.full_feature_names
- A boolean indicating whether to return full feature names. If True, the feature names will be returned in the format<feature_view_name>__<feature_name>
. If False, only the feature names will be returned.
class GetHistoricalFeaturesRequest(BaseModel)
Request object for getting historical features.
Attributes:
entities
- A dictionary of entity name to entity value.timestamps
- A list of timestamps. Used to retrieve historical features at specific timestamps. If not provided, the latest feature values will be returned.features
- A list of feature names.
class GetFeastHistoricalFeaturesRequest(BaseModel)
Request object for getting historical features from Feast.
Attributes:
full_feature_names
- A boolean indicating whether to return full feature names. If True, the feature names will be returned in the format<feature_view_name>__<feature_name>
. If False, only the feature names will be returned.entities
- A dictionary of entity name to entity value.features
- A list of feature names.
class GetHistoricalFeaturesResponse(BaseModel)
Response object for getting historical features.
Attributes:
results
- A list of dictionaries containing feature values.
class MaterializeRequest(BaseModel)
Request object for materializing feature views.
Attributes:
end_date
- The end date of the materialization window. Defaults to the current time.feature_views
- A list of feature view names to materialize. If not provided, all feature views will be materialized.start_date
- The start date of the materialization window. Defaults to None, which will use the start date of the feature view.
class RequestEntityIdentifierObjects(BaseModel)
Request object for getting entity identifier objects.
Attributes:
request_ids
- A list of request IDs.entity_identifiers
- A list of entity identifiers.feature_names
- A list of feature names.
5 minutes
12 mins
def generate_wyvern_store_app(path: str) -> FastAPI
Generate a FastAPI app for Wyvern feature store.
Arguments:
path
- Path to the feature store repo.
Returns:
FastAPI app.
def start_wyvern_store(path: str, host: str, port: int)
Start the Wyvern feature store.
Arguments:
path
- Path to the feature store repo.host
- Host to run the feature store on.port
- Port to run the feature store on.
def separate_real_time_features(
full_feature_names: Optional[List[str]]
) -> Tuple[List[str], List[str]]
Given a list of full feature names, separate real-time features and other features.
Arguments:
full_feature_names
- a list of full feature names.
Returns:
Real time feature names and other feature names in two lists respectively.
def build_historical_real_time_feature_requests(
full_feature_names: List[str], request_ids: List[str],
entities: Dict[str,
List[Any]]) -> Dict[str, RequestEntityIdentifierObjects]
Build historical real-time feature requests grouped by entity types so that we can process them in parallel.
Arguments:
full_feature_names
- a list of full feature names.request_ids
- a list of request ids.entities
- a dictionary of entity names and their values.
Returns:
A dictionary of entity types and their corresponding requests.
def process_historical_real_time_features_requests(
requests: Dict[str, RequestEntityIdentifierObjects]
) -> Dict[str, pd.DataFrame]
Given a dictionary of historical real-time feature requests, process them and return the results.
Arguments:
requests
- a dictionary of entity types and their corresponding requests.
Returns:
A dictionary of entity types and their corresponding results in pandas dataframes.
def process_historical_real_time_features_request(
entity_identifier_type: str, request: RequestEntityIdentifierObjects,
context: SnowflakeConnection) -> pd.DataFrame
Given a historical real-time feature request, process it and return the results.
Arguments:
entity_identifier_type
- the entity type of the request. E.g. "product__query"request
- the request object.context
- the snowflake connection context.
Returns:
The result in pandas dataframe.
def group_realtime_features_by_entity_type(
full_feature_names: List[str]) -> Dict[str, List[str]]
Given a list of feature names, group them by their entity_identifier_type
Arguments:
full_feature_names
- a list of full feature names.
Returns:
A dictionary of entity types and their corresponding feature names.
def group_registry_features_by_entities(
full_feature_names: List[str],
store: FeatureStore) -> Dict[str, List[str]]
Given a list of feature names, group them by their entity name.
Arguments:
full_feature_names
- a list of full feature names.store
- the feast feature store.
Returns:
A dictionary of entity names and their corresponding feature names.
def build_historical_registry_feature_requests(
store: FeatureStore, feature_names: List[str],
entity_values: Dict[str, List[Any]],
timestamps: List[datetime]) -> List[GetFeastHistoricalFeaturesRequest]
Build historical feature requests grouped by entity names so that we can process them in parallel.
Arguments:
store
- the feast feature store.feature_names
- a list of feature names.entity_values
- a dictionary of entity names and their values.timestamps
- a list of timestamps for getting historical features at those timestamps.
Returns:
A list of historical feature requests.
def process_historical_registry_features_requests(
store: FeatureStore, requests: List[GetFeastHistoricalFeaturesRequest]
) -> List[pd.DataFrame]
Given a list of historical feature requests, process them and return the results
Arguments:
store
- the feast feature store.requests
- a list of historical feature requests.
Returns:
A list of results in pandas dataframes.
def process_historical_registry_features_request(
store: FeatureStore,
request: GetFeastHistoricalFeaturesRequest) -> pd.DataFrame
Given a historical feature request, process it and return the results
Arguments:
store
- the feast feature store.request
- a historical feature request.
Returns:
The result in pandas dataframe.
class SortEnum(str, Enum)
Enum for sort order.
class Sort(BaseModel)
Sort class for sorting the results.
Attributes:
sort_key
- The key to sort on.sort_field
- The field to sort on.sort_order
- The order to sort on. Defaults to desc.
def feature_map_join(*feature_maps: FeatureMap) -> FeatureMap
Joins multiple feature maps into a single feature map. Used to join feature maps from different sources.
def feature_map_create(*feature_data: Optional[FeatureData]) -> FeatureMap
Creates a feature map from a list of feature data. Used to create feature maps from different sources.
class WyvernDataModel(BaseModel)
WyvernDataModel is a base class for all data models that could be hydrated from Wyvern Index.
Attributes:
_all_entities
- a list of all the entities under the tree_all_identifiers
- a list of all the identifiers under the tree
def index_fields() -> List[str]
This method returns a list of fields that contains indexable data
def get_all_entities(cached: bool = True) -> List[WyvernEntity]
This method returns all of the entities associated with subclasses of this
If cached is True, all the nodes under the tree will be cached
def get_all_identifiers(cached: bool = True) -> List[Identifier]
This method generally returns all of the identifiers associated with subclasses of this
Example: You create a QueryProductEntity with query="test" and product_id="1234" It subclasses QueryEntity and ProductEntity, which both have an identifier This method will return a list of both of those identifiers
Example: You create a ProductSearchRankingRequest with query="test", candidates=["1234", ...], user="u_1234" This method will return the user and query identifier It will also return the identifiers for each candidate (thanks to the implementation in CandidateEntity)
Note: While this checks for WyvernEntity
-- a WyvernDataModel
can have many
entities within it, it itself may not be an entity
def nested_hydration() -> Dict[str, str]
A dictionary that maps the entity id field name to the nested entity field name
TODO: [SHU] replace this mapping by introducing class WyvernField(pydantic.Field)
to represent the "entity ide field", which will reference to the nested entity field name
class WyvernEntity(WyvernDataModel)
WyvernEntity is a base class for all entities that have primary identifier. An entity is the basic unit of data that could be indexed and queried.
@property
def identifier() -> Identifier
This method returns the identifier for this entity
def load_fields(data: Dict[str, Any]) -> None
This method load the entity with the given data. The return data is the nested entities that need to be further hydrated
For example:
if a Product contains these two fields: brand_id: Optional[str]
and brand: Optional[Brand]
,
as the hydrated entity. We fetch the brand_id for the product from Wyvern Index,
as the first hydration step for Product entity, then we fetch brand entity from Wyvern Index,
as the second hydration step
class QueryEntity(WyvernEntity)
QueryEntity is a base class for all entities that have query as an identifier.
Attributes:
query
- the query string
def generate_identifier() -> Identifier
This method returns the identifier for this entity.
Returns:
Identifier
- the identifier for this entity with identifier_type=SimpleIdentifierType.QUERY.
class ProductEntity(WyvernEntity)
ProductEntity is a base class for all entities that have product_id as an identifier.
Attributes:
product_id
- the product id
def generate_identifier() -> Identifier
This method returns the identifier for this entity.
Returns:
Identifier
- the identifier for this entity with identifier_type=SimpleIdentifierType.PRODUCT.
class UserEntity(WyvernEntity)
UserEntity is a base class for all entities that have user_id as an identifier.
Attributes:
user_id
- the user id
def generate_identifier() -> Identifier
This method returns the identifier for this entity.
Returns:
Identifier
- the identifier for this entity with identifier_type=SimpleIdentifierType.USER.
class BaseWyvernRequest(WyvernDataModel)
Base class for all Wyvern requests. This class is used to generate an identifier for the request.
Attributes:
request_id
- The request id.include_events
- Whether to include events in the response.
def generate_identifier() -> Identifier
Generates an identifier for the request.
Returns:
Identifier
- The identifier for the request. The identifier type is "request".
class FeatureData(BaseModel)
A class to represent the features of an entity.
Attributes:
identifier
- The identifier of the entity.features
- A dictionary of feature names to feature values.
class FeatureMap(BaseModel)
A class to represent a map of identifiers to feature data.
TODO (kerem): Fix the data duplication between this class and the FeatureData class. The identifier field in the FeatureData class is redundant.
def build_empty_feature_map(identifiers: List[Identifier],
feature_names: List[str]) -> FeatureMap
Builds an empty feature map with the given identifiers and feature names.
class SimpleIdentifierType(str, Enum)
Simple identifier types are those that are not composite.
def composite(primary_identifier_type: SimpleIdentifierType,
secondary_identifier_type: SimpleIdentifierType) -> str
Composite identifier types are those that are composite. For example, a product with id p_1234 and type "product" a user with id u_1234 and type "user" would have a composite identifier of "p_1234:u_1234", and a composite identifier_type of "product:user". This is useful for indexing and searching for composite entities.
class CompositeIdentifierType(str, Enum)
Composite identifier types are those that are composite. For example, a composite identifier type of "product:user" would be a composite identifier type for a product and a user. This is useful for indexing and searching for composite entities.
class Identifier(BaseModel)
Identifiers exist to represent a unique entity through their unique id and their type For example: a product with id p_1234 and type "product" or a user with id u_1234 and type "user"
Composite identifiers are also possible, for example: a product with id p_1234 and type "product" a user with id u_1234 and type "user"
The composite identifier would be "p_1234:u_1234",
and the composite identifier_type would be "product:user"
class CompositeIdentifier(Identifier)
Composite identifiers exist to represent a unique entity through their unique id and their type. At most, they can have two identifiers and two identifier types. For example: a product with id p_1234 and type "product" a user with id u_1234 and type "user"
The composite identifier would be "p_1234:u_1234", and the composite identifier_type would be "product:user".
class ScoredCandidate(GenericModel, Generic[GENERALIZED_WYVERN_ENTITY])
A candidate entity with a score.
Attributes:
entity
- The candidate entity.score
- The score of the candidate entity. Defaults to 0.0.
class CandidateSetEntity(WyvernDataModel, GenericModel,
Generic[GENERALIZED_WYVERN_ENTITY])
A set of candidate entities. This is a generic model that can be used to represent a set of candidate entities.
Attributes:
candidates
- The list of candidate entities.
@wyvern_cli_app.command()
def run(host: str = "127.0.0.1", port: int = 8000) -> None
Run your wyvern service
async def sample_product_query_ranking_request() -> None
How to run this: python wyvern/examples/example_business_logic.py
Json representation of the request:
{
"request_id": "rrr",
"query": "candle",
"candidates": [
{"product_id": "1", "product_name": "scented candle"},
{"product_id": "2", "product_name": "hot candle"},
{"product_id": "3", "product_name": "pumpkin candle"},
{"product_id": "4", "product_name": "unrelated item"},
{"product_id": "5", "product_name": "candle holder accessory"},
{"product_id": "6", "product_name": "earwax holder"},
{"product_id": "7", "product_name": "wax seal"}
],
}
@wyvern_cli_app.command()
def run(host: str = "127.0.0.1", port: int = 8000) -> None
Run your wyvern service