Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support s3gov schema by snowflake offline store during materialization #3891

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,17 @@ def to_remote_storage(self) -> List[str]:
HEADER = TRUE
"""
cursor = execute_snowflake_statement(self.snowflake_conn, query)
# s3gov schema is used by Snowflake in AWS govcloud regions
# remove gov portion from schema and pass it to online store upload
native_export_path = self.export_path.replace("s3gov://", "s3://")
return self._get_file_names_from_copy_into(cursor, native_export_path)

def _get_file_names_from_copy_into(self, cursor, native_export_path) -> List[str]:
file_name_column_index = [
idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME"
][0]
return [
f"{self.export_path}/{row[file_name_column_index]}"
f"{native_export_path}/{row[file_name_column_index]}"
for row in cursor.fetchall()
]

Expand Down
57 changes: 57 additions & 0 deletions sdk/python/tests/unit/infra/offline_stores/test_snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import re
from unittest.mock import ANY, MagicMock, patch

import pytest

from feast.infra.offline_stores.snowflake import (
SnowflakeOfflineStoreConfig,
SnowflakeRetrievalJob,
)
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.repo_config import RepoConfig


@pytest.fixture(params=["s3", "s3gov"])
def retrieval_job(request):
offline_store_config = SnowflakeOfflineStoreConfig(
type="snowflake.offline",
account="snow",
user="snow",
password="snow",
role="snow",
warehouse="snow",
database="FEAST",
schema="OFFLINE",
storage_integration_name="FEAST_S3",
blob_export_location=f"{request.param}://feast-snowflake-offload/export",
)
retrieval_job = SnowflakeRetrievalJob(
query="SELECT * FROM snowflake",
snowflake_conn=MagicMock(),
config=RepoConfig(
registry="s3://ml-test/repo/registry.db",
project="test",
provider="snowflake.offline",
online_store=SqliteOnlineStoreConfig(type="sqlite"),
offline_store=offline_store_config,
),
full_feature_names=True,
on_demand_feature_views=[],
)
return retrieval_job


def test_to_remote_storage(retrieval_job):
stored_files = ["just a path", "maybe another"]
with patch.object(
retrieval_job, "to_snowflake", return_value=None
) as mock_to_snowflake, patch.object(
retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files
) as mock_get_file_names_from_copy:
assert (
retrieval_job.to_remote_storage() == stored_files
), "should return the list of files"
mock_to_snowflake.assert_called_once()
mock_get_file_names_from_copy.assert_called_once_with(ANY, ANY)
native_path = mock_get_file_names_from_copy.call_args[0][1]
assert re.match("^s3://.*", native_path), "path should be s3://*"
Loading