Skip to content

Commit

Permalink
[FSTORE-1265] feature_group.materialization_job.run() fails with Spar… (
Browse files Browse the repository at this point in the history
#1234)

* [FSTORE-1265] feature_group.materialization_job.run() fails with Spark engine
  • Loading branch information
SirOibaf authored Feb 24, 2024
1 parent fc8b352 commit 2cd5b2f
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 184 deletions.
35 changes: 31 additions & 4 deletions python/hsfs/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#

import humps
from hsfs import engine
import time

from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import job_api
from hsfs.core import job_schedule as js
Expand Down Expand Up @@ -58,7 +60,7 @@ def __init__(
@classmethod
def from_response_json(cls, json_dict):
# Job config should not be decamelized when updated
config = json_dict.pop("config")
config = json_dict.pop("config", None)
json_decamelized = humps.decamelize(json_dict)
json_decamelized["config"] = config
return cls(**json_decamelized)
Expand Down Expand Up @@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
self._job_api.launch(self.name, args=args)
print(
"Job started successfully, you can follow the progress at \n{}".format(
engine.get_instance().get_job_url(self.href)
util.get_job_url(self.href)
)
)
engine.get_instance().wait_for_job(self, await_termination=await_termination)
self._wait_for_job(await_termination=await_termination)

def get_state(self):
"""Get the state of the job.
Expand Down Expand Up @@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
self._name, job_schedule.to_dict()
)
return self._job_schedule

def _wait_for_job(self, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(self)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)
57 changes: 6 additions & 51 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pandas as pd
import numpy as np
import boto3
import time
import re
import ast
import warnings
Expand All @@ -39,7 +38,6 @@

from io import BytesIO
from pyhive import hive
from urllib.parse import urlparse
from typing import TypeVar, Optional, Dict, Any
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
from tqdm.auto import tqdm
Expand All @@ -65,7 +63,7 @@
)
from hsfs.constructor import query
from hsfs.training_dataset_split import TrainingDatasetSplit
from hsfs.client import exceptions, hopsworks
from hsfs.client import hopsworks
from hsfs.feature_group import FeatureGroup
from thrift.transport.TTransport import TTransportException
from pyhive.exc import OperationalError
Expand Down Expand Up @@ -384,11 +382,11 @@ def profile_by_spark(self, metadata_instance):
job = stat_api.compute(metadata_instance)
print(
"Statistics Job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(job.href)
util.get_job_url(job.href)
)
)

self.wait_for_job(job)
job._wait_for_job()
return job

def profile(
Expand Down Expand Up @@ -807,15 +805,13 @@ def write_training_dataset(
td_job = td_api.compute(training_dataset, td_app_conf)
print(
"Training dataset job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(td_job.href)
util.get_job_url(td_job.href)
)
)

self.wait_for_job(
td_job,
await_termination=user_write_options.get("wait_for_job", True),
td_job._wait_for_job(
await_termination=user_write_options.get("wait_for_job", True)
)

return td_job

def _create_hive_connection(self, feature_store, hive_config=None):
Expand Down Expand Up @@ -882,22 +878,6 @@ def save_empty_dataframe(self, feature_group):
"""Wrapper around save_dataframe in order to provide no-op."""
pass

def get_job_url(self, href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs
Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()

def _get_app_options(self, user_write_options={}):
"""
Generate the options that should be passed to the application doing the ingestion.
Expand All @@ -916,31 +896,6 @@ def _get_app_options(self, user_write_options={}):
spark_job_configuration=spark_job_configuration,
)

def wait_for_job(self, job, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(job)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise exceptions.FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise exceptions.FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)

def add_file(self, file):
if not file:
return file
Expand Down
17 changes: 17 additions & 0 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,23 @@ def verify_attribute_key_names(feature_group_obj, external_feature_group=False):
)


def get_job_url(href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs
Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()


def translate_legacy_spark_type(output_type):
if output_type == "StringType()":
return "STRING"
Expand Down
89 changes: 88 additions & 1 deletion python/tests/core/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#


from hsfs.core import job
import pytest

from hsfs.core import job, execution
from hsfs.client import exceptions


class TestJob:
Expand Down Expand Up @@ -44,3 +47,87 @@ def test_from_response_json_empty(self, backend_fixtures):
assert j.name == "test_name"
assert j.executions is None
assert j.href is None

def test_wait_for_job(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

# Act
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1

def test_wait_for_job_wait_for_job_false(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

# Act
j._wait_for_job(False)

# Assert
assert mock_job_api.return_value.last_execution.call_count == 0

def test_wait_for_job_final_status_succeeded(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="succeeded")
]

# Act
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1

def test_wait_for_job_final_status_failed(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="failed")
]

# Act
with pytest.raises(exceptions.FeatureStoreException) as e_info:
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1
assert (
str(e_info.value)
== "The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)

def test_wait_for_job_final_status_killed(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="killed")
]

# Act
with pytest.raises(exceptions.FeatureStoreException) as e_info:
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1
assert str(e_info.value) == "The Hopsworks Job was stopped"
Loading

0 comments on commit 2cd5b2f

Please sign in to comment.