diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index c893dca71f..444dc10c3d 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -28,27 +28,26 @@ import uuid FLOAT_TOLERANCE = 0.00001 -PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6] +PROJECT_NAME = "basic_" + uuid.uuid4().hex.upper()[0:6] DIR_PATH = os.path.dirname(os.path.realpath(__file__)) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def core_url(pytestconfig): return pytestconfig.getoption("core_url") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def serving_url(pytestconfig): return pytestconfig.getoption("serving_url") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def allow_dirty(pytestconfig): - return True if pytestconfig.getoption( - "allow_dirty").lower() == "true" else False + return True if pytestconfig.getoption("allow_dirty").lower() == "true" else False -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def client(core_url, serving_url, allow_dirty): # Get client for core and serving client = Client(core_url=core_url, serving_url=serving_url) @@ -68,8 +67,7 @@ def client(core_url, serving_url, allow_dirty): def basic_dataframe(entities, features, ingest_time, n_size): offset = random.randint(1000, 100000) # ensure a unique key space is used df_dict = { - "datetime": [ingest_time.replace(tzinfo=pytz.utc) for _ in - range(n_size)], + "datetime": [ingest_time.replace(tzinfo=pytz.utc) for _ in range(n_size)], } for entity_name in entities: df_dict[entity_name] = list(range(1, n_size + 1)) @@ -85,31 +83,37 @@ def ingest_time(): @pytest.fixture(scope="module") def cust_trans_df(ingest_time): - return basic_dataframe(entities=["customer_id"], - features=["daily_transactions", "total_transactions"], - ingest_time=ingest_time, - n_size=5) + return basic_dataframe( + entities=["customer_id"], + features=["daily_transactions", "total_transactions"], + ingest_time=ingest_time, + n_size=5, + ) @pytest.fixture(scope="module") def driver_df(ingest_time): - return basic_dataframe(entities=["driver_id"], - features=["rating", "cost"], - ingest_time=ingest_time, - n_size=5) + return basic_dataframe( + entities=["driver_id"], + features=["rating", "cost"], + ingest_time=ingest_time, + n_size=5, + ) def test_version_returns_results(client): version_info = client.version() - assert not version_info['core'] is 'not configured' - assert not version_info['serving'] is 'not configured' + assert not version_info["core"] is "not configured" + assert not version_info["serving"] is "not configured" @pytest.mark.timeout(45) @pytest.mark.run(order=10) def test_basic_register_feature_set_success(client): # Register feature set without project - cust_trans_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + cust_trans_fs_expected = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) driver_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/driver_fs.yaml") client.apply(cust_trans_fs_expected) client.apply(driver_fs_expected) @@ -119,11 +123,14 @@ def test_basic_register_feature_set_success(client): assert driver_fs_actual == driver_fs_expected # Register feature set with project - cust_trans_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + cust_trans_fs_expected = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) client.set_project(PROJECT_NAME) client.apply(cust_trans_fs_expected) - cust_trans_fs_actual = client.get_feature_set("customer_transactions", - project=PROJECT_NAME) + cust_trans_fs_actual = client.get_feature_set( + "customer_transactions", project=PROJECT_NAME + ) assert cust_trans_fs_actual == cust_trans_fs_expected # Register feature set with labels @@ -172,27 +179,21 @@ def test_basic_retrieve_online_success(client, cust_trans_df): ) ], # Test retrieve with different variations of the string feature refs - feature_refs=[ - "daily_transactions", - "total_transactions", - ] + feature_refs=["daily_transactions", "total_transactions",], ) # type: GetOnlineFeaturesResponse if response is None: continue returned_daily_transactions = float( - response.field_values[0] - .fields["daily_transactions"] - .float_val + response.field_values[0].fields["daily_transactions"].float_val ) - sent_daily_transactions = float( - cust_trans_df.iloc[0]["daily_transactions"]) + sent_daily_transactions = float(cust_trans_df.iloc[0]["daily_transactions"]) if math.isclose( - sent_daily_transactions, - returned_daily_transactions, - abs_tol=FLOAT_TOLERANCE, + sent_daily_transactions, + returned_daily_transactions, + abs_tol=FLOAT_TOLERANCE, ): break @@ -217,9 +218,7 @@ def test_basic_retrieve_online_multiple_featureset(client, cust_trans_df, driver "customer_id": Value( int64_val=cust_trans_df.iloc[0]["customer_id"] ), - "driver_id": Value( - int64_val=driver_df.iloc[0]["driver_id"] - ) + "driver_id": Value(int64_val=driver_df.iloc[0]["driver_id"]), } ) ], @@ -231,9 +230,7 @@ def test_basic_retrieve_online_multiple_featureset(client, cust_trans_df, driver def check_response(ingest_df, response, feature_ref): returned_value = float( - response.field_values[0] - .fields[feature_ref] - .float_val + response.field_values[0].fields[feature_ref].float_val ) feature_ref_splits = feature_ref.split(":") if len(feature_ref_splits) == 1: @@ -241,16 +238,13 @@ def check_response(ingest_df, response, feature_ref): else: _, feature_name = feature_ref_splits - sent_value = float( - ingest_df.iloc[0][feature_name]) + sent_value = float(ingest_df.iloc[0][feature_name]) - return math.isclose( - sent_value, - returned_value, - abs_tol=FLOAT_TOLERANCE, - ) + return math.isclose(sent_value, returned_value, abs_tol=FLOAT_TOLERANCE,) - if all([check_response(df, response, ref) for ref, df in feature_ref_df_mapping]): + if all( + [check_response(df, response, ref) for ref, df in feature_ref_df_mapping] + ): break @@ -260,9 +254,12 @@ def test_basic_ingest_jobs(client): # list ingestion jobs given featureset cust_trans_fs = client.get_feature_set(name="customer_transactions") ingest_jobs = client.list_ingest_jobs( - feature_set_ref=FeatureSetRef.from_feature_set(cust_trans_fs)) + feature_set_ref=FeatureSetRef.from_feature_set(cust_trans_fs) + ) # filter ingestion jobs to only those that are running - ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + ingest_jobs = [ + job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING + ] assert len(ingest_jobs) >= 1 for ingest_job in ingest_jobs: @@ -277,18 +274,16 @@ def test_basic_ingest_jobs(client): assert ingest_job.status == IngestionJobStatus.ABORTED -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def all_types_dataframe(): return pd.DataFrame( { - "datetime": [datetime.utcnow().replace(tzinfo=pytz.utc) for _ in - range(3)], + "datetime": [datetime.utcnow().replace(tzinfo=pytz.utc) for _ in range(3)], "user_id": [1001, 1002, 1003], "int32_feature": [np.int32(1), np.int32(2), np.int32(3)], "int64_feature": [np.int64(1), np.int64(2), np.int64(3)], "float_feature": [np.float(0.1), np.float(0.2), np.float(0.3)], - "double_feature": [np.float64(0.1), np.float64(0.2), - np.float64(0.3)], + "double_feature": [np.float64(0.1), np.float64(0.2), np.float64(0.3)], "string_feature": ["one", "two", "three"], "bytes_feature": [b"one", b"two", b"three"], "bool_feature": [True, False, False], @@ -350,8 +345,7 @@ def test_all_types_register_feature_set_success(client): Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST), Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST), Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST), - Feature(name="string_list_feature", - dtype=ValueType.STRING_LIST), + Feature(name="string_list_feature", dtype=ValueType.STRING_LIST), Feature(name="bytes_list_feature", dtype=ValueType.BYTES_LIST), ], max_age=Duration(seconds=3600), @@ -397,8 +391,11 @@ def test_all_types_retrieve_online_success(client, all_types_dataframe): response = client.get_online_features( entity_rows=[ GetOnlineFeaturesRequest.EntityRow( - fields={"user_id": Value( - int64_val=all_types_dataframe.iloc[0]["user_id"])} + fields={ + "user_id": Value( + int64_val=all_types_dataframe.iloc[0]["user_id"] + ) + } ) ], feature_refs=[ @@ -422,15 +419,13 @@ def test_all_types_retrieve_online_success(client, all_types_dataframe): continue returned_float_list = ( - response.field_values[0] - .fields["float_list_feature"] - .float_list_val.val + response.field_values[0].fields["float_list_feature"].float_list_val.val ) sent_float_list = all_types_dataframe.iloc[0]["float_list_feature"] if math.isclose( - returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE + returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE ): break @@ -441,9 +436,12 @@ def test_all_types_ingest_jobs(client, all_types_dataframe): # list ingestion jobs given featureset all_types_fs = client.get_feature_set(name="all_types") ingest_jobs = client.list_ingest_jobs( - feature_set_ref=FeatureSetRef.from_feature_set(all_types_fs)) + feature_set_ref=FeatureSetRef.from_feature_set(all_types_fs) + ) # filter ingestion jobs to only those that are running - ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + ingest_jobs = [ + job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING + ] assert len(ingest_jobs) >= 1 for ingest_job in ingest_jobs: @@ -458,15 +456,14 @@ def test_all_types_ingest_jobs(client, all_types_dataframe): assert ingest_job.status == IngestionJobStatus.ABORTED -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def large_volume_dataframe(): ROW_COUNT = 100000 offset = random.randint(1000000, 10000000) # ensure a unique key space customer_data = pd.DataFrame( { "datetime": [ - datetime.utcnow().replace(tzinfo=pytz.utc) for _ in - range(ROW_COUNT) + datetime.utcnow().replace(tzinfo=pytz.utc) for _ in range(ROW_COUNT) ], "customer_id": [offset + inc for inc in range(ROW_COUNT)], "daily_transactions_large": [np.random.rand() for _ in range(ROW_COUNT)], @@ -480,7 +477,8 @@ def large_volume_dataframe(): @pytest.mark.run(order=30) def test_large_volume_register_feature_set_success(client): cust_trans_fs_expected = FeatureSet.from_yaml( - f"{DIR_PATH}/large_volume/cust_trans_large_fs.yaml") + f"{DIR_PATH}/large_volume/cust_trans_large_fs.yaml" + ) # Register feature set client.apply(cust_trans_fs_expected) @@ -488,8 +486,7 @@ def test_large_volume_register_feature_set_success(client): # Feast Core needs some time to fully commit the FeatureSet applied # when there is no existing job yet for the Featureset time.sleep(10) - cust_trans_fs_actual = client.get_feature_set( - name="customer_transactions_large") + cust_trans_fs_actual = client.get_feature_set(name="customer_transactions_large") assert cust_trans_fs_actual == cust_trans_fs_expected @@ -524,80 +521,73 @@ def test_large_volume_retrieve_online_success(client, large_volume_dataframe): GetOnlineFeaturesRequest.EntityRow( fields={ "customer_id": Value( - int64_val=large_volume_dataframe.iloc[0][ - "customer_id"] + int64_val=large_volume_dataframe.iloc[0]["customer_id"] ) } ) ], - feature_refs=[ - "daily_transactions_large", - "total_transactions_large", - ], + feature_refs=["daily_transactions_large", "total_transactions_large",], ) # type: GetOnlineFeaturesResponse if response is None: continue returned_daily_transactions = float( - response.field_values[0] - .fields["daily_transactions_large"] - .float_val + response.field_values[0].fields["daily_transactions_large"].float_val ) sent_daily_transactions = float( - large_volume_dataframe.iloc[0]["daily_transactions_large"]) + large_volume_dataframe.iloc[0]["daily_transactions_large"] + ) if math.isclose( - sent_daily_transactions, - returned_daily_transactions, - abs_tol=FLOAT_TOLERANCE, + sent_daily_transactions, + returned_daily_transactions, + abs_tol=FLOAT_TOLERANCE, ): break -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def all_types_parquet_file(): COUNT = 20000 df = pd.DataFrame( { "datetime": [datetime.utcnow() for _ in range(COUNT)], - "customer_id": [np.int32(random.randint(0, 10000)) for _ in - range(COUNT)], - "int32_feature_parquet": [np.int32(random.randint(0, 10000)) for _ in - range(COUNT)], - "int64_feature_parquet": [np.int64(random.randint(0, 10000)) for _ in - range(COUNT)], + "customer_id": [np.int32(random.randint(0, 10000)) for _ in range(COUNT)], + "int32_feature_parquet": [ + np.int32(random.randint(0, 10000)) for _ in range(COUNT) + ], + "int64_feature_parquet": [ + np.int64(random.randint(0, 10000)) for _ in range(COUNT) + ], "float_feature_parquet": [np.float(random.random()) for _ in range(COUNT)], - "double_feature_parquet": [np.float64(random.random()) for _ in - range(COUNT)], - "string_feature_parquet": ["one" + str(random.random()) for _ in - range(COUNT)], + "double_feature_parquet": [ + np.float64(random.random()) for _ in range(COUNT) + ], + "string_feature_parquet": [ + "one" + str(random.random()) for _ in range(COUNT) + ], "bytes_feature_parquet": [b"one" for _ in range(COUNT)], "int32_list_feature_parquet": [ np.array([1, 2, 3, random.randint(0, 10000)], dtype=np.int32) - for _ - in range(COUNT) + for _ in range(COUNT) ], "int64_list_feature_parquet": [ np.array([1, random.randint(0, 10000), 3, 4], dtype=np.int64) - for _ - in range(COUNT) + for _ in range(COUNT) ], "float_list_feature_parquet": [ - np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float32) for - _ - in range(COUNT) + np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float32) + for _ in range(COUNT) ], "double_list_feature_parquet": [ - np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float64) for - _ - in range(COUNT) + np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float64) + for _ in range(COUNT) ], "string_list_feature_parquet": [ - np.array(["one", "two" + str(random.random()), "three"]) for _ - in - range(COUNT) + np.array(["one", "two" + str(random.random()), "three"]) + for _ in range(COUNT) ], "bytes_list_feature_parquet": [ np.array([b"one", b"two", b"three"]) for _ in range(COUNT) @@ -608,7 +598,7 @@ def all_types_parquet_file(): # TODO: Boolean list is not being tested. # https://github.com/feast-dev/feast/issues/341 - file_path = os.path.join(tempfile.mkdtemp(), 'all_types.parquet') + file_path = os.path.join(tempfile.mkdtemp(), "all_types.parquet") df.to_parquet(file_path, allow_truncated_timestamps=True) return file_path @@ -618,7 +608,8 @@ def all_types_parquet_file(): def test_all_types_parquet_register_feature_set_success(client): # Load feature set from file all_types_parquet_expected = FeatureSet.from_yaml( - f"{DIR_PATH}/all_types_parquet/all_types_parquet.yaml") + f"{DIR_PATH}/all_types_parquet/all_types_parquet.yaml" + ) # Register feature set client.apply(all_types_parquet_expected) @@ -642,8 +633,7 @@ def test_all_types_parquet_register_feature_set_success(client): @pytest.mark.timeout(600) @pytest.mark.run(order=41) -def test_all_types_infer_register_ingest_file_success(client, - all_types_parquet_file): +def test_all_types_infer_register_ingest_file_success(client, all_types_parquet_file): # Get feature set all_types_fs = client.get_feature_set(name="all_types_parquet") @@ -660,14 +650,14 @@ class TestsBasedOnGrpc: @pytest.fixture(scope="module") def core_service_stub(self, core_url): if core_url.endswith(":443"): - core_channel = grpc.secure_channel( - core_url, grpc.ssl_channel_credentials() - ) + core_channel = grpc.secure_channel(core_url, grpc.ssl_channel_credentials()) else: core_channel = grpc.insecure_channel(core_url) try: - grpc.channel_ready_future(core_channel).result(timeout=self.GRPC_CONNECTION_TIMEOUT) + grpc.channel_ready_future(core_channel).result( + timeout=self.GRPC_CONNECTION_TIMEOUT + ) except grpc.FutureTimeoutError: raise ConnectionError( f"Connection timed out while attempting to connect to Feast "