diff --git a/aztk/node_scripts/scheduling/submit.py b/aztk/node_scripts/scheduling/submit.py index f6eb26ed..14a6416c 100644 --- a/aztk/node_scripts/scheduling/submit.py +++ b/aztk/node_scripts/scheduling/submit.py @@ -33,8 +33,16 @@ def __app_submit_cmd(application): spark_submit_cmd.add_option("--name", application.name) spark_submit_cmd.add_option("--class", application.main_class) spark_submit_cmd.add_option("--jars", jars and ",".join(jars)) + spark_submit_cmd.add_option("--packages", application.packages and '"{}"'.format(",".join(application.packages))) + spark_submit_cmd.add_option("--exclude-packages", application.exclude_packages + and '"{}"'.format(",".join(application.exclude_packages))) + spark_submit_cmd.add_option("--repositories", application.repositories + and '"{}"'.format(",".join(application.repositories))) spark_submit_cmd.add_option("--py-files", py_files and ",".join(py_files)) spark_submit_cmd.add_option("--files", files and ",".join(files)) + for key, val in application.conf.items(): + spark_submit_cmd.add_option("--conf", '"{key}={val}"'.format(key=key, val=val)) + spark_submit_cmd.add_option("--properties-file", application.properties_file) spark_submit_cmd.add_option("--driver-java-options", application.driver_java_options) spark_submit_cmd.add_option("--driver-library-path", application.driver_library_path) spark_submit_cmd.add_option("--driver-class-path", application.driver_class_path) diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 746fec10..687aaeaa 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -125,8 +125,13 @@ def __init__( application_args=None, main_class=None, jars=None, + packages=None, + exclude_packages=None, + repositories=None, py_files=None, files=None, + conf=None, + properties_file=None, driver_java_options=None, driver_library_path=None, driver_class_path=None, @@ -141,8 +146,13 @@ def __init__( self.application_args = application_args self.main_class = main_class self.jars = jars or [] + self.packages = packages or [] + self.exclude_packages = exclude_packages or [] + self.repositories = repositories or [] self.py_files = py_files or [] self.files = files or [] + self.conf = conf or {} + self.properties_file = properties_file self.driver_java_options = driver_java_options self.driver_library_path = driver_library_path self.driver_class_path = driver_class_path diff --git a/aztk_cli/config.py b/aztk_cli/config.py index 10fe3d4c..a0a06e29 100644 --- a/aztk_cli/config.py +++ b/aztk_cli/config.py @@ -213,8 +213,13 @@ def _merge_dict(self, config): application_args=application.get("application_args"), main_class=application.get("main_class"), jars=application.get("jars"), + packages=application.get("packages"), + exclude_packages=application.get("exclude_packages"), + repositories=application.get("repositories"), py_files=application.get("py_files"), files=application.get("files"), + conf=application.get("conf"), + properties_file=application.get("properties_file"), driver_java_options=application.get("driver_java_options"), driver_library_path=application.get("driver_library_path"), driver_class_path=application.get("driver_class_path"), diff --git a/aztk_cli/config/job.yaml b/aztk_cli/config/job.yaml index fe5818d2..cfe27fe1 100644 --- a/aztk_cli/config/job.yaml +++ b/aztk_cli/config/job.yaml @@ -40,10 +40,20 @@ job: main_class: jars: - + packages: + - + exclude_packages: + - + repositories: + - py_files: - files: - + conf: # extra spark config options + # key1: value1 + # key2: value2 + properties_file: driver_java_options: driver_library_path: driver_class_path: @@ -59,10 +69,20 @@ job: main_class: jars: - + packages: + - + exclude_packages: + - + repositories: + - py_files: - files: - + conf: # extra spark config options + # key1: value1 + # key2: value2 + properties_file: driver_java_options: driver_library_path: driver_class_path: diff --git a/aztk_cli/spark/endpoints/cluster/cluster_submit.py b/aztk_cli/spark/endpoints/cluster/cluster_submit.py index 473ea0a9..587cd3f9 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_submit.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_submit.py @@ -7,6 +7,15 @@ from aztk_cli import config, log, utils +class AppendToDict(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + key_vals = getattr(namespace, self.dest) or {} + for key_val_str in values.replace(" ", "").split(","): + key, val = key_val_str.split("=") + key_vals[key] = val + setattr(namespace, self.dest, key_vals) + + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument("--id", dest="cluster_id", required=True, help="The unique id of your spark cluster") @@ -25,6 +34,32 @@ def setup_parser(parser: argparse.ArgumentParser): absolute path to reference files.", ) + parser.add_argument( + "--packages", + help="Comma-separated list of maven coordinates of \ + jars to include on the driver and executor \ + classpaths. Will search the local maven repo, \ + then maven central and any additional remote \ + repositories given by --repositories. The \ + format for the coordinates should be \ + groupId:artifactId:version.", + ) + + parser.add_argument( + "--exclude-packages", + help="Comma-separated list of groupId:artifactId, to \ + exclude while resolving the dependencies \ + provided in --packages to avoid dependency \ + conflicts.", + ) + + parser.add_argument( + "--repositories", + help="Comma-separated list of additional remote \ + repositories to search for the maven \ + coordinates given with --packages.", + ) + parser.add_argument( "--py-files", help="Comma-separated list of .zip, .egg, or .py files \ @@ -39,6 +74,24 @@ def setup_parser(parser: argparse.ArgumentParser): absolute path ot reference files.", ) + parser.add_argument( + "--conf", + action=AppendToDict, + metavar='"PROP1=VAL1[,PROP2=VAL2...]"', + help='Arbitrary Spark configuration property(/-ies). \ + Multiple --conf options can be added, either \ + by using multiple --conf flags or by supplying \ + a comma-separated list. All "PROP=VAL,..." \ + arguments should be wrapped in double quotes.', + ) + + parser.add_argument( + "--properties-file", + help="Path to a file from which to load extra \ + properties. If not specified, this will look \ + for conf/spark-defaults.conf.", + ) + parser.add_argument("--driver-java-options", help="Extra Java options to pass to the driver.") parser.add_argument("--driver-library-path", help="Extra library path entries to pass to the driver.") @@ -105,6 +158,9 @@ def execute(args: typing.NamedTuple): spark_client = aztk.spark.Client(config.load_aztk_secrets()) jars = [] + packages = [] + exclude_packages = [] + repositories = [] py_files = [] files = [] @@ -117,6 +173,15 @@ def execute(args: typing.NamedTuple): if args.files is not None: files = args.files.replace(" ", "").split(",") + if args.packages is not None: + packages = args.packages.replace(" ", "").split(",") + + if args.exclude_packages is not None: + exclude_packages = args.exclude_packages.replace(" ", "").split(",") + + if args.repositories is not None: + repositories = args.repositories.replace(" ", "").split(",") + log_application(args, jars, py_files, files) spark_client.cluster.submit( @@ -127,8 +192,13 @@ def execute(args: typing.NamedTuple): application_args=args.app_args, main_class=args.main_class, jars=jars, + packages=packages, + exclude_packages=exclude_packages, + repositories=repositories, py_files=py_files, files=files, + conf=args.conf, + properties_file=args.properties_file, driver_java_options=args.driver_java_options, driver_library_path=args.driver_library_path, driver_class_path=args.driver_class_path, diff --git a/docs/70-jobs.md b/docs/70-jobs.md index 906a331c..271014d0 100644 --- a/docs/70-jobs.md +++ b/docs/70-jobs.md @@ -20,10 +20,20 @@ Each Job has one or more applications given as a List in Job.yaml. Applications main_class: jars: - + packages: + - + exclude_packages: + - + repositories: + - py_files: - files: - + conf: + # key1: value1 + # key2: value2 + properties_file: driver_java_options: - driver_library_path: