Skip to content

Commit

Permalink
Fix local submission issues. (feathr-ai#988)
Browse files Browse the repository at this point in the history
* Fix local submission issues

* Update _localspark_submission.py

* Update client.py

* Update _localspark_submission.py

* Update _localspark_submission.py
  • Loading branch information
xiaoyongzhu authored Jan 18, 2023
1 parent 4493867 commit b706da0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
4 changes: 2 additions & 2 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,9 @@ def _construct_redis_key(self, feature_table, key):
def _str_to_bool(self, s: str, variable_name = None):
"""Define a function to detect convert string to bool, since Redis client sometimes require a bool and sometimes require a str
"""
if s.casefold() == 'True'.casefold() or s == True:
if (isinstance(s, str) and s.casefold() == 'True'.casefold()) or s == True:
return True
elif s.casefold() == 'False'.casefold() or s == False:
elif (isinstance(s, str) and s.casefold() == 'False'.casefold()) or s == False:
return False
else:
self.logger.warning(f'{s} is not a valid Bool value. Maybe you want to double check if it is set correctly for {variable_name}.')
Expand Down
17 changes: 15 additions & 2 deletions feathr_project/feathr/spark_provider/_localspark_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def submit_feathr_job(
cfg = configuration.copy() if configuration else {}
maven_dependency = f"{cfg.pop('spark.jars.packages', self.packages)},{get_maven_artifact_fullname()}"
spark_args = self._init_args(job_name=job_name, confs=cfg)

# Add additional repositories
spark_args.extend(["--repositories", "https://repository.mulesoft.org/nexus/content/repositories/public/,https://linkedin.jfrog.io/artifactory/open-source/"])
# spark_args.extend(["--repositories", "https://linkedin.jfrog.io/artifactory/open-source/"])

if not main_jar_path:
# We don't have the main jar, use Maven
if not python_files:
Expand All @@ -106,7 +109,16 @@ def submit_feathr_job(
print(python_files)
spark_args.append(python_files[0])
else:
spark_args.extend(["--class", main_class_name, main_jar_path])
if not python_files:
# This is a JAR job
spark_args.extend(["--class", main_class_name, main_jar_path])
else:
spark_args.extend(["--packages", maven_dependency])
# This is a PySpark job, no more things to
if python_files.__len__() > 1:
spark_args.extend(["--py-files", ",".join(python_files[1:])])
spark_args.append(python_files[0])


if arguments:
spark_args.extend(arguments)
Expand Down Expand Up @@ -299,4 +311,5 @@ def _get_default_package(self):
packages.append("commons-io:commons-io:2.6")
packages.append("org.apache.hadoop:hadoop-azure:2.7.4")
packages.append("com.microsoft.azure:azure-storage:8.6.4")
packages.append("com.github.everit-org.json-schema:org.everit.json.schema:1.9.1")
return ",".join(packages)

0 comments on commit b706da0

Please sign in to comment.