diff --git a/feathr_project/feathr/_abc.py b/feathr_project/feathr/_abc.py index 0d371ca34..b7ecc907d 100644 --- a/feathr_project/feathr/_abc.py +++ b/feathr_project/feathr/_abc.py @@ -56,4 +56,3 @@ def get_status(self) -> str: str: _description_ """ pass - diff --git a/feathr_project/feathr/_synapse_submission.py b/feathr_project/feathr/_synapse_submission.py index 727eebe52..6dfbe8319 100644 --- a/feathr_project/feathr/_synapse_submission.py +++ b/feathr_project/feathr/_synapse_submission.py @@ -7,7 +7,6 @@ from urllib.parse import urlparse from os.path import basename from enum import Enum - from azure.identity import (ChainedTokenCredential, DefaultAzureCredential, DeviceCodeCredential, EnvironmentCredential, ManagedIdentityCredential) @@ -15,6 +14,7 @@ from azure.synapse.spark import SparkClient from azure.synapse.spark.models import SparkBatchJobOptions from loguru import logger +from requests import request from tqdm import tqdm from feathr._abc import SparkJobLauncher @@ -51,6 +51,9 @@ def __init__(self, synapse_dev_url: str, pool_name: str, datalake_dir: str, exec synapse_dev_url, pool_name, executor_size=executor_size, executors=executors, credential=self.credential) self._datalake = _DataLakeFiler( datalake_dir, credential=self.credential) + # Save Synapse parameters to retrieve driver log + self._synapse_dev_url = synapse_dev_url + self._pool_name = pool_name def upload_or_get_cloud_path(self, local_path_or_http_path: str): """ @@ -132,6 +135,8 @@ def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool: if status in {LivyStates.SUCCESS.value}: return True elif status in {LivyStates.ERROR.value, LivyStates.DEAD.value, LivyStates.KILLED.value}: + logger.error("Feathr job has failed. Please visit this page to view error message: {}", self.job_url) + logger.error(self._api.get_driver_log(self.current_job_info.id)) return False else: time.sleep(30) @@ -166,8 +171,6 @@ def get_job_tags(self) -> Dict[str, str]: """ return self._api.get_spark_batch_job(self.current_job_info.id).tags - - class _SynapseJobRunner(object): """ Class to interact with Synapse Spark cluster @@ -176,6 +179,7 @@ def __init__(self, synapse_dev_url, spark_pool_name, credential=None, executor_s if credential is None: logger.warning('No valid Azure credential detected. Using DefaultAzureCredential') credential = DefaultAzureCredential() + self._credential = credential self.client = SparkClient( credential=credential, @@ -269,6 +273,15 @@ def create_spark_batch_job(self, job_name, main_file, class_name=None, return self.client.spark_batch.create_spark_batch_job(spark_batch_job_options, detailed=True) + def get_driver_log(self, job_id) -> str: + # @see: https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/connect-monitor-azure-synapse-spark-application-level-metrics + app_id = self.get_spark_batch_job(job_id).app_id + url = "%s/sparkhistory/api/v1/sparkpools/%s/livyid/%s/applications/%s/driverlog/stdout/?isDownload=true" % (self._synapse_dev_url, self._pool_name, job_id, app_id) + token = self._credential.get_token("https://dev.azuresynapse.net/.default") + req = urllib.request.Request(url=url, headers={"authorization": "Bearer %s" % token}) + resp = urllib.request.urlopen(req) + return resp.read() + class _DataLakeFiler(object): """