Skip to content

Commit

Permalink
feat: Add workgroup to athena offline store config (#3139)
Browse files Browse the repository at this point in the history
* fix: FeastModuleImportError

Signed-off-by: derek1032 <dchang@health2sync.com>

* chore: Add aws region for athena universal tests

Signed-off-by: derek1032 <dchang@health2sync.com>

* feat: Add workgroup attribute for Athena offline store config

Signed-off-by: derek1032 <dchang@health2sync.com>

* fix: Add fail_if_exists condition to query string

Signed-off-by: derek1032 <dchang@health2sync.com>

* chore: Format python codes

Signed-off-by: derek1032 <dchang@health2sync.com>

* chore: use os.getenv and default value

Signed-off-by: derek1032 <dchang@health2sync.com>

Signed-off-by: derek1032 <dchang@health2sync.com>
  • Loading branch information
derek1032 authored Aug 25, 2022
1 parent c1c71da commit a752211
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 50 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,19 @@ test-python-universal-mssql:
sdk/python/tests


#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use.
#If tests fail with the pytest -n 8 option, change the number to 1.
# To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS.
# https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
# Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_WORKGROUP or
# ATHENA_S3_BUCKET_NAME according to your needs. If tests fail with the pytest -n 8 option, change the number to 1.
test-python-universal-athena:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
ATHENA_REGION=ap-northeast-2 \
ATHENA_DATA_SOURCE=AwsDataCatalog \
ATHENA_DATABASE=default \
ATHENA_WORKGROUP=primary \
ATHENA_S3_BUCKET_NAME=feast-integration-tests \
python -m pytest -n 8 --integration \
-k "not test_go_feature_server and \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class AthenaOfflineStoreConfig(FeastConfigBaseModel):
database: StrictStr
""" Athena database name """

workgroup: StrictStr
""" Athena workgroup name """

s3_staging_location: StrictStr
""" S3 path for importing & exporting data to Athena """

Expand Down Expand Up @@ -243,6 +246,7 @@ def query_generator() -> Iterator[str]:
athena_client,
config.offline_store.data_source,
config.offline_store.database,
config.offline_store.workgroup,
f"DROP TABLE IF EXISTS {config.offline_store.database}.{table_name}",
)

Expand Down Expand Up @@ -293,6 +297,7 @@ def write_logged_features(
athena_client=athena_client,
data_source=config.offline_store.data_source,
database=config.offline_store.database,
workgroup=config.offline_store.workgroup,
s3_resource=s3_resource,
s3_path=s3_path,
table_name=destination.table_name,
Expand Down Expand Up @@ -378,6 +383,7 @@ def _to_df_internal(self) -> pd.DataFrame:
self._athena_client,
self._config.offline_store.data_source,
self._config.offline_store.database,
self._config.offline_store.workgroup,
self._s3_resource,
temp_external_location,
self.get_temp_table_dml_header(temp_table_name, temp_external_location)
Expand All @@ -394,6 +400,7 @@ def _to_arrow_internal(self) -> pa.Table:
self._athena_client,
self._config.offline_store.data_source,
self._config.offline_store.database,
self._config.offline_store.workgroup,
self._s3_resource,
temp_external_location,
self.get_temp_table_dml_header(temp_table_name, temp_external_location)
Expand Down Expand Up @@ -432,6 +439,7 @@ def to_athena(self, table_name: str) -> None:
self._athena_client,
self._config.offline_store.data_source,
self._config.offline_store.database,
self._config.offline_store.workgroup,
query,
)

Expand All @@ -449,6 +457,7 @@ def _upload_entity_df(
athena_client,
config.offline_store.data_source,
config.offline_store.database,
config.offline_store.workgroup,
s3_resource,
f"{config.offline_store.s3_staging_location}/entity_df/{table_name}/{table_name}.parquet",
table_name,
Expand All @@ -460,6 +469,7 @@ def _upload_entity_df(
athena_client,
config.offline_store.data_source,
config.offline_store.database,
config.offline_store.workgroup,
f"CREATE TABLE {table_name} AS ({entity_df})",
)
else:
Expand Down Expand Up @@ -514,6 +524,7 @@ def _get_entity_df_event_timestamp_range(
athena_client,
config.offline_store.data_source,
config.offline_store.database,
config.offline_store.workgroup,
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max "
f"FROM ({entity_df})",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def get_table_column_names_and_types(
client,
config.offline_store.data_source,
config.offline_store.database,
config.offline_store.workgroup,
f"SELECT * FROM ({self.query}) LIMIT 1",
)
columns = aws_utils.get_athena_query_result(client, statement_id)[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,20 @@ class AthenaDataSourceCreator(DataSourceCreator):

def __init__(self, project_name: str, *args, **kwargs):
super().__init__(project_name)
self.client = aws_utils.get_athena_data_client("ap-northeast-2")
self.s3 = aws_utils.get_s3_resource("ap-northeast-2")
data_source = (
os.environ.get("ATHENA_DATA_SOURCE")
if os.environ.get("ATHENA_DATA_SOURCE")
else "AwsDataCatalog"
)
database = (
os.environ.get("ATHENA_DATABASE")
if os.environ.get("ATHENA_DATABASE")
else "default"
)
bucket_name = (
os.environ.get("ATHENA_S3_BUCKET_NAME")
if os.environ.get("ATHENA_S3_BUCKET_NAME")
else "feast-integration-tests"
)

region = os.getenv("ATHENA_REGION", "ap-northeast-2")
data_source = os.getenv("ATHENA_DATA_SOURCE", "AwsDataCatalog")
database = os.getenv("ATHENA_DATABASE", "default")
workgroup = os.getenv("ATHENA_WORKGROUP", "primary")
bucket_name = os.getenv("ATHENA_S3_BUCKET_NAME", "feast-integration-tests")

self.client = aws_utils.get_athena_data_client(region)
self.s3 = aws_utils.get_s3_resource(region)
self.offline_store_config = AthenaOfflineStoreConfig(
data_source=f"{data_source}",
region="ap-northeast-2",
database=f"{database}",
data_source=data_source,
region=region,
database=database,
workgroup=workgroup,
s3_staging_location=f"s3://{bucket_name}/test_dir",
)

Expand Down Expand Up @@ -77,6 +70,7 @@ def create_data_source(
self.client,
self.offline_store_config.data_source,
self.offline_store_config.database,
self.offline_store_config.workgroup,
self.s3,
s3_target,
table_name,
Expand Down Expand Up @@ -126,5 +120,6 @@ def teardown(self):
self.client,
self.offline_store_config.data_source,
self.offline_store_config.database,
self.offline_store_config.workgroup,
f"DROP TABLE IF EXISTS {table}",
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from feast.infra.offline_stores.contrib.athena_offline_store.tests.data_source import (
AthenaDataSourceCreator,
)
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.universal.data_sources.athena import (
AthenaDataSourceCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
Expand Down
Loading

0 comments on commit a752211

Please sign in to comment.