Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge master #4002

Merged
merged 12 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/lint_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ on:
- edited
- synchronize

permissions:
# read-only perms specified due to use of pull_request_target in lieu of security label check
pull-requests: read

jobs:
validate-title:
# when using pull_request_target, all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes.
if:
((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) ||
(github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) &&
github.repository == 'feast-dev/feast'
name: Validate PR title
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ "3.8", "3.10" ]
python-version: [ "3.10" ]
os: [ ubuntu-latest ]
env:
OS: ${{ matrix.os }}
Expand Down Expand Up @@ -167,4 +167,4 @@ jobs:
SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }}
SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }}
SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }}
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread
2 changes: 1 addition & 1 deletion .github/workflows/pr_local_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ "3.8", "3.10" ]
python-version: [ "3.10" ]
os: [ ubuntu-latest ]
env:
OS: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ "3.8", "3.9", "3.10" ]
python-version: [ "3.9", "3.10" ]
os: [ ubuntu-latest, macOS-latest ]
exclude:
- os: macOS-latest
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/alpha-web-ui.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ When you start the React app, it will look for `project-list.json` to find a lis
}
```

* **Note** - `registryPath` only supports a file location or a url.

Then start the React App

```bash
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/offline-stores/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

The Spark offline store provides support for reading [SparkSources](../data-sources/spark.md).

* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view.
* Entity dataframes can be provided as a SQL query, Pandas dataframe or can be provided as a Pyspark dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view.

## Disclaimer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ set -e

# feast root directory is expected to be mounted (eg, by docker compose)
cd /mnt/feast
pip install -e '.[redis]'
pip install -e '.[grpcio,redis]'

cd /app
python materialize.py
feast serve_transformations --port 8080
feast serve_transformations --port 8080
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,8 @@ def __init__(self):
class PushSourceNotFoundException(Exception):
def __init__(self, push_source_name: str):
super().__init__(f"Unable to find push source '{push_source_name}'.")


class ReadOnlyRegistryException(Exception):
def __init__(self):
super().__init__("Registry implementation is read-only.")
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def __init__(
self._registry = SnowflakeRegistry(
registry_config, self.config.project, None
)
elif registry_config and registry_config.registry_type == "remote":
from feast.infra.registry.remote import RemoteRegistry

self._registry = RemoteRegistry(registry_config, self.config.project, None)
else:
r = Registry(self.config.project, registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame],
registry: Registry,
project: str,
full_feature_names: bool = False,
Expand Down Expand Up @@ -473,15 +473,16 @@ def _get_entity_df_event_timestamp_range(
entity_df_event_timestamp.min().to_pydatetime(),
entity_df_event_timestamp.max().to_pydatetime(),
)
elif isinstance(entity_df, str):
elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame):
# If the entity_df is a string (SQL query), determine range
# from table
df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col)

# Checks if executing entity sql resulted in any data
if df.rdd.isEmpty():
raise EntitySQLEmptyResults(entity_df)

if isinstance(entity_df, str):
df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col)
# Checks if executing entity sql resulted in any data
if df.rdd.isEmpty():
raise EntitySQLEmptyResults(entity_df)
else:
df = entity_df
# TODO(kzhang132): need utc conversion here.

entity_df_event_timestamp_range = (
Expand All @@ -499,8 +500,11 @@ def _get_entity_schema(
) -> Dict[str, np.dtype]:
if isinstance(entity_df, pd.DataFrame):
return dict(zip(entity_df.columns, entity_df.dtypes))
elif isinstance(entity_df, str):
entity_spark_df = spark_session.sql(entity_df)
elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame):
if isinstance(entity_df, str):
entity_spark_df = spark_session.sql(entity_df)
else:
entity_spark_df = entity_df
return dict(
zip(
entity_spark_df.columns,
Expand All @@ -526,6 +530,9 @@ def _upload_entity_df(
elif isinstance(entity_df, str):
spark_session.sql(entity_df).createOrReplaceTempView(table_name)
return
elif isinstance(entity_df, pyspark.sql.DataFrame):
entity_df.createOrReplaceTempView(table_name)
return
else:
raise InvalidEntityType(type(entity_df))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(
query: Optional[str] = None,
path: Optional[str] = None,
file_format: Optional[str] = None,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
description: Optional[str] = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ def __init__(
"must be include into pytest plugins"
)
self.exposed_port = self.container.get_exposed_port("8080")
self.container_host = self.container.get_container_host_ip()
self.client = Trino(
user="user",
catalog="memory",
host="localhost",
host=self.container_host,
port=self.exposed_port,
source="trino-python-client",
http_scheme="http",
Expand Down Expand Up @@ -123,7 +124,7 @@ def get_prefixed_table_name(self, suffix: str) -> str:

def create_offline_store_config(self) -> FeastConfigBaseModel:
return TrinoOfflineStoreConfig(
host="localhost",
host=self.container_host,
port=self.exposed_port,
catalog="memory",
dataset=self.project_name,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True):
@abstractmethod
def get_stream_feature_view(
self, name: str, project: str, allow_cache: bool = False
):
) -> StreamFeatureView:
"""
Retrieves a stream feature view.

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def __new__(
from feast.infra.registry.snowflake import SnowflakeRegistry

return SnowflakeRegistry(registry_config, project, repo_path)
elif registry_config and registry_config.registry_type == "remote":
from feast.infra.registry.remote import RemoteRegistry

return RemoteRegistry(registry_config, project, repo_path)
else:
return super(Registry, cls).__new__(cls)

Expand Down
Loading
Loading