Skip to content

Commit

Permalink
support job service control loop
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Jan 15, 2021
1 parent eaaf233 commit 22ead1d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
6 changes: 6 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ def pytest_addoption(parser):
parser.addoption("--feast-project", action="store", default="default")
parser.addoption("--statsd-url", action="store", default="localhost:8125")
parser.addoption("--prometheus-url", action="store", default="localhost:9102")
parser.addoption(
"--scheduled-streaming-job",
action="store_true",
help="When set tests won't manually start streaming jobs,"
" instead jobservice's loop is responsible for that",
)


def pytest_runtest_setup(item):
Expand Down
21 changes: 13 additions & 8 deletions tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_offline_ingestion_from_bq_view(pytestconfig, bq_dataset, feast_client:


def test_streaming_ingestion(
feast_client: Client, local_staging_path: str, kafka_server
feast_client: Client, local_staging_path: str, kafka_server, pytestconfig
):
entity = Entity(name="s2id", description="S2id", value_type=ValueType.INT64,)
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
Expand All @@ -124,12 +124,14 @@ def test_streaming_ingestion(
feast_client.apply(entity)
feast_client.apply(feature_table)

job = feast_client.start_stream_to_online_ingestion(feature_table)
assert job.get_feature_table() == feature_table.name

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
if not pytestconfig.getoption("scheduled_streaming_job"):
job = feast_client.start_stream_to_online_ingestion(feature_table)
assert job.get_feature_table() == feature_table.name
wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
else:
job = None

wait_retry_backoff(
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
Expand All @@ -148,7 +150,10 @@ def test_streaming_ingestion(
feature_names=["drivers_stream:unique_drivers"],
)
finally:
job.cancel()
if job:
job.cancel()
else:
feast_client.delete_feature_table(feature_table.name)

pd.testing.assert_frame_equal(
ingested[["s2id", "drivers_stream:unique_drivers"]],
Expand Down
38 changes: 24 additions & 14 deletions tests/e2e/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,25 @@ def create_schema(kafka_broker, topic_name, feature_table_name):
return entity, feature_table


def test_validation_with_ge(feast_client: Client, kafka_server):
def start_job(feast_client: Client, feature_table: FeatureTable, pytestconfig):
if pytestconfig.getoption("scheduled_streaming_job"):
return

job = feast_client.start_stream_to_online_ingestion(feature_table)
wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
return job


def stop_job(job, feast_client: Client, feature_table: FeatureTable):
if job:
job.cancel()
else:
feast_client.delete_feature_table(feature_table.name)


def test_validation_with_ge(feast_client: Client, kafka_server, pytestconfig):
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
topic_name = f"avro-{uuid.uuid4()}"

Expand All @@ -82,11 +100,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):
udf = create_validation_udf("testUDF", expectations, feature_table)
apply_validation(feast_client, feature_table, udf, validation_window_secs=1)

job = feast_client.start_stream_to_online_ingestion(feature_table)

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
job = start_job(feast_client, feature_table, pytestconfig)

wait_retry_backoff(
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
Expand Down Expand Up @@ -117,7 +131,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):
expected_ingested_count=test_data.shape[0] - len(invalid_idx),
)
finally:
job.cancel()
stop_job(job, feast_client, feature_table)

test_data["num"] = test_data["num"].astype(np.float64)
test_data["num"].iloc[invalid_idx] = np.nan
Expand All @@ -133,7 +147,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):

@pytest.mark.env("local")
def test_validation_reports_metrics(
feast_client: Client, kafka_server, statsd_server: StatsDServer
feast_client: Client, kafka_server, statsd_server: StatsDServer, pytestconfig
):
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
topic_name = f"avro-{uuid.uuid4()}"
Expand All @@ -153,11 +167,7 @@ def test_validation_reports_metrics(
udf = create_validation_udf("testUDF", expectations, feature_table)
apply_validation(feast_client, feature_table, udf, validation_window_secs=10)

job = feast_client.start_stream_to_online_ingestion(feature_table)

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
job = start_job(feast_client, feature_table, pytestconfig)

wait_retry_backoff(
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
Expand Down Expand Up @@ -196,7 +206,7 @@ def test_validation_reports_metrics(
expected_ingested_count=test_data.shape[0] - len(invalid_idx),
)
finally:
job.cancel()
stop_job(job, feast_client, feature_table)

expected_metrics = [
(
Expand Down

0 comments on commit 22ead1d

Please sign in to comment.