Skip to content

Commit

Permalink
#211 Enable using maven package
Browse files Browse the repository at this point in the history
  • Loading branch information
windoze committed Jun 9, 2022
1 parent 604ef87 commit dc6066c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 26 deletions.
2 changes: 1 addition & 1 deletion feathr_project/feathr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}>"
TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_{REGISTRY_TYPEDEF_VERSION}>"

FEATHR_JAR_MAVEN_REPO="com.linkedin.feathr:feathr_2.12:0.4.0"
FEATHR_MAVEN_ARTIFACT="com.linkedin.feathr:feathr_2.12:0.4.0"
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name:
submission_params['new_cluster']['spark_conf'] = configuration
submission_params['new_cluster']['custom_tags'] = job_tags
# the feathr main jar file is anyway needed regardless it's pyspark or scala spark
if main_jar_path is None:
if main_jar_path is None or main_jar_path=="":
logger.info("Main JAR file is not set, using default package from Maven")
submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_JAR_MAVEN_REPO }
submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_MAVEN_ARTIFACT }
else:
submission_params['libraries'][0]['jar'] = self.upload_or_get_cloud_path(main_jar_path)
# see here for the submission parameter definition https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--request-structure-6
Expand Down
64 changes: 41 additions & 23 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,32 +97,45 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
job_name (str): name of the job
main_jar_path (str): main file paths, usually your main jar file
main_class_name (str): name of your main class
arguments (str): all the arugments you want to pass into the spark job
job_tags (str): tags of the job, for exmaple you might want to put your user ID, or a tag with a certain information
arguments (str): all the arguments you want to pass into the spark job
job_tags (str): tags of the job, for example you might want to put your user ID, or a tag with a certain information
configuration (Dict[str, str]): Additional configs for the spark job
"""

# Use an no-op jar as the main executable, as we must set the `main_file` in order to submit a Spark job to Azure Synapse
cfg = configuration.copy() # We don't want to mess up input parameters
if main_jar_path is None:
logger.info("Main JAR file is not set, using default package from Maven")
if "spark.jars.packages" in cfg:
cfg["spark.jars.packages"] = ",".join(
[cfg["spark.jars.packages"], FEATHR_JAR_MAVEN_REPO])
if not main_jar_path:
# We don't have the main jar, use Maven
if not python_files:
# This is a JAR job
# Azure Synapse/Livy doesn't allow JAR job starts from Maven directly, we must have a jar file uploaded.
# so we have to use a dummy jar as the main file.
logger.info("Main JAR file is not set, using default package from Maven")
# Add Maven dependency to the job
if "spark.jars.packages" in cfg:
cfg["spark.jars.packages"] = ",".join(
[cfg["spark.jars.packages"], FEATHR_MAVEN_ARTIFACT])
else:
cfg["spark.jars.packages"] = FEATHR_MAVEN_ARTIFACT
# Use the no-op jar as the main file
# This is a dummy jar which contains only one `org.example.Noop` class with one empty `main` function which does nothing
current_dir = pathlib.Path(__file__).parent.resolve()
main_jar_path = os.path.join(current_dir, "noop-1.0.jar")
else:
cfg["spark.jars.packages"] = FEATHR_JAR_MAVEN_REPO
current_dir = pathlib.Path(__file__).parent.resolve()
main_jar_path = os.path.join(current_dir, "noop-1.0.jar")
if main_jar_path.startswith('abfs'):
main_jar_cloud_path = main_jar_path
logger.info(
'Cloud path {} is used for running the job: {}', main_jar_path, job_name)
else:
logger.info('Uploading jar from {} to cloud for running job: {}',
main_jar_path, job_name)
main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path)
logger.info('{} is uploaded to {} for running job: {}',
main_jar_path, main_jar_cloud_path, job_name)
# This is a PySpark job
pass
main_jar_cloud_path = None
if main_jar_path:
# Now we have a main jar, either feathr or noop
if main_jar_path.startswith('abfs'):
main_jar_cloud_path = main_jar_path
logger.info(
'Cloud path {} is used for running the job: {}', main_jar_path, job_name)
else:
logger.info('Uploading jar from {} to cloud for running job: {}',
main_jar_path, job_name)
main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path)
logger.info('{} is uploaded to {} for running job: {}',
main_jar_path, main_jar_cloud_path, job_name)

reference_file_paths = []
for file_path in reference_files_path:
Expand Down Expand Up @@ -263,8 +276,13 @@ def create_spark_batch_job(self, job_name, main_file, class_name=None,
executor_cores = self.EXECUTOR_SIZE[self._executor_size]['Cores']
executor_memory = self.EXECUTOR_SIZE[self._executor_size]['Memory']

# need to put the jar in as dependencies for pyspark job
jars = jars + [main_file]
# If we have a main jar, it needs to be added as dependencies for pyspark job
# Otherwise it's a PySpark job with Feathr JAR from Maven
if main_file:
if not python_files:
# These 2 parameters should not be empty at the same time
raise ValueError("Main JAR is not set for the Spark job")
jars = jars + [main_file]

# If file=main_file, then it's using only Scala Spark
# If file=python_files[0], then it's using Pyspark
Expand Down
Binary file modified feathr_project/feathr/spark_provider/noop-1.0.jar
Binary file not shown.

0 comments on commit dc6066c

Please sign in to comment.