Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#51 Show error log on job failure #218

Merged
merged 6 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion feathr_project/feathr/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,3 @@ def get_status(self) -> str:
str: _description_
"""
pass

19 changes: 16 additions & 3 deletions feathr_project/feathr/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from urllib.parse import urlparse
from os.path import basename
from enum import Enum

from azure.identity import (ChainedTokenCredential, DefaultAzureCredential,
DeviceCodeCredential, EnvironmentCredential,
ManagedIdentityCredential)
from azure.storage.filedatalake import DataLakeServiceClient
from azure.synapse.spark import SparkClient
from azure.synapse.spark.models import SparkBatchJobOptions
from loguru import logger
from requests import request
from tqdm import tqdm

from feathr._abc import SparkJobLauncher
Expand Down Expand Up @@ -51,6 +51,9 @@ def __init__(self, synapse_dev_url: str, pool_name: str, datalake_dir: str, exec
synapse_dev_url, pool_name, executor_size=executor_size, executors=executors, credential=self.credential)
self._datalake = _DataLakeFiler(
datalake_dir, credential=self.credential)
# Save Synapse parameters to retrieve driver log
self._synapse_dev_url = synapse_dev_url
self._pool_name = pool_name

def upload_or_get_cloud_path(self, local_path_or_http_path: str):
"""
Expand Down Expand Up @@ -132,6 +135,8 @@ def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool:
if status in {LivyStates.SUCCESS.value}:
return True
elif status in {LivyStates.ERROR.value, LivyStates.DEAD.value, LivyStates.KILLED.value}:
logger.error("Feathr job has failed. Please visit this page to view error message: {}", self.job_url)
logger.error(self._api.get_driver_log(self.current_job_info.id))
return False
else:
time.sleep(30)
Expand Down Expand Up @@ -166,8 +171,6 @@ def get_job_tags(self) -> Dict[str, str]:
"""
return self._api.get_spark_batch_job(self.current_job_info.id).tags



class _SynapseJobRunner(object):
"""
Class to interact with Synapse Spark cluster
Expand All @@ -176,6 +179,7 @@ def __init__(self, synapse_dev_url, spark_pool_name, credential=None, executor_s
if credential is None:
logger.warning('No valid Azure credential detected. Using DefaultAzureCredential')
credential = DefaultAzureCredential()
self._credential = credential

self.client = SparkClient(
credential=credential,
Expand Down Expand Up @@ -269,6 +273,15 @@ def create_spark_batch_job(self, job_name, main_file, class_name=None,

return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True)

def get_driver_log(self, job_id) -> str:
# @see: https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/connect-monitor-azure-synapse-spark-application-level-metrics
app_id = self.get_spark_batch_job(job_id).app_id
url = "%s/sparkhistory/api/v1/sparkpools/%s/livyid/%s/applications/%s/driverlog/stdout/?isDownload=true" % (self._synapse_dev_url, self._pool_name, job_id, app_id)
token = self._credential.get_token("https://dev.azuresynapse.net/.default")
req = urllib.request.Request(url=url, headers={"authorization": "Bearer %s" % token})
resp = urllib.request.urlopen(req)
return resp.read()


class _DataLakeFiler(object):
"""
Expand Down