diff --git a/feathr_project/feathr/constants.py b/feathr_project/feathr/constants.py index 20d57675a..8884753f5 100644 --- a/feathr_project/feathr/constants.py +++ b/feathr_project/feathr/constants.py @@ -24,4 +24,4 @@ TYPEDEF_ARRAY_DERIVED_FEATURE=f"array" TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array" -FEATHR_JAR_MAVEN_REPO="com.linkedin.feathr:feathr_2.12:0.4.0" \ No newline at end of file +FEATHR_MAVEN_ARTIFACT="com.linkedin.feathr:feathr_2.12:0.4.0" \ No newline at end of file diff --git a/feathr_project/feathr/spark_provider/_databricks_submission.py b/feathr_project/feathr/spark_provider/_databricks_submission.py index 579e00527..114e0728a 100644 --- a/feathr_project/feathr/spark_provider/_databricks_submission.py +++ b/feathr_project/feathr/spark_provider/_databricks_submission.py @@ -143,9 +143,9 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: submission_params['new_cluster']['spark_conf'] = configuration submission_params['new_cluster']['custom_tags'] = job_tags # the feathr main jar file is anyway needed regardless it's pyspark or scala spark - if main_jar_path is None: + if main_jar_path is None or main_jar_path=="": logger.info("Main JAR file is not set, using default package from Maven") - submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_JAR_MAVEN_REPO } + submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_MAVEN_ARTIFACT } else: submission_params['libraries'][0]['jar'] = self.upload_or_get_cloud_path(main_jar_path) # see here for the submission parameter definition https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--request-structure-6 diff --git a/feathr_project/feathr/spark_provider/_synapse_submission.py b/feathr_project/feathr/spark_provider/_synapse_submission.py index e5079d5e7..77d836e8f 100644 --- a/feathr_project/feathr/spark_provider/_synapse_submission.py +++ b/feathr_project/feathr/spark_provider/_synapse_submission.py @@ -97,32 +97,45 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas job_name (str): name of the job main_jar_path (str): main file paths, usually your main jar file main_class_name (str): name of your main class - arguments (str): all the arugments you want to pass into the spark job - job_tags (str): tags of the job, for exmaple you might want to put your user ID, or a tag with a certain information + arguments (str): all the arguments you want to pass into the spark job + job_tags (str): tags of the job, for example you might want to put your user ID, or a tag with a certain information configuration (Dict[str, str]): Additional configs for the spark job """ - # Use an no-op jar as the main executable, as we must set the `main_file` in order to submit a Spark job to Azure Synapse cfg = configuration.copy() # We don't want to mess up input parameters - if main_jar_path is None: - logger.info("Main JAR file is not set, using default package from Maven") - if "spark.jars.packages" in cfg: - cfg["spark.jars.packages"] = ",".join( - [cfg["spark.jars.packages"], FEATHR_JAR_MAVEN_REPO]) + if not main_jar_path: + # We don't have the main jar, use Maven + if not python_files: + # This is a JAR job + # Azure Synapse/Livy doesn't allow JAR job starts from Maven directly, we must have a jar file uploaded. + # so we have to use a dummy jar as the main file. + logger.info("Main JAR file is not set, using default package from Maven") + # Add Maven dependency to the job + if "spark.jars.packages" in cfg: + cfg["spark.jars.packages"] = ",".join( + [cfg["spark.jars.packages"], FEATHR_MAVEN_ARTIFACT]) + else: + cfg["spark.jars.packages"] = FEATHR_MAVEN_ARTIFACT + # Use the no-op jar as the main file + # This is a dummy jar which contains only one `org.example.Noop` class with one empty `main` function which does nothing + current_dir = pathlib.Path(__file__).parent.resolve() + main_jar_path = os.path.join(current_dir, "noop-1.0.jar") else: - cfg["spark.jars.packages"] = FEATHR_JAR_MAVEN_REPO - current_dir = pathlib.Path(__file__).parent.resolve() - main_jar_path = os.path.join(current_dir, "noop-1.0.jar") - if main_jar_path.startswith('abfs'): - main_jar_cloud_path = main_jar_path - logger.info( - 'Cloud path {} is used for running the job: {}', main_jar_path, job_name) - else: - logger.info('Uploading jar from {} to cloud for running job: {}', - main_jar_path, job_name) - main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path) - logger.info('{} is uploaded to {} for running job: {}', - main_jar_path, main_jar_cloud_path, job_name) + # This is a PySpark job + pass + main_jar_cloud_path = None + if main_jar_path: + # Now we have a main jar, either feathr or noop + if main_jar_path.startswith('abfs'): + main_jar_cloud_path = main_jar_path + logger.info( + 'Cloud path {} is used for running the job: {}', main_jar_path, job_name) + else: + logger.info('Uploading jar from {} to cloud for running job: {}', + main_jar_path, job_name) + main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path) + logger.info('{} is uploaded to {} for running job: {}', + main_jar_path, main_jar_cloud_path, job_name) reference_file_paths = [] for file_path in reference_files_path: @@ -263,8 +276,13 @@ def create_spark_batch_job(self, job_name, main_file, class_name=None, executor_cores = self.EXECUTOR_SIZE[self._executor_size]['Cores'] executor_memory = self.EXECUTOR_SIZE[self._executor_size]['Memory'] - # need to put the jar in as dependencies for pyspark job - jars = jars + [main_file] + # If we have a main jar, it needs to be added as dependencies for pyspark job + # Otherwise it's a PySpark job with Feathr JAR from Maven + if main_file: + if not python_files: + # These 2 parameters should not be empty at the same time + raise ValueError("Main JAR is not set for the Spark job") + jars = jars + [main_file] # If file=main_file, then it's using only Scala Spark # If file=python_files[0], then it's using Pyspark diff --git a/feathr_project/feathr/spark_provider/noop-1.0.jar b/feathr_project/feathr/spark_provider/noop-1.0.jar index 24f7cb775..6b3b9ba56 100644 Binary files a/feathr_project/feathr/spark_provider/noop-1.0.jar and b/feathr_project/feathr/spark_provider/noop-1.0.jar differ