From 7998f6b594eace682f9f4be3efd6bd57f53c0cc1 Mon Sep 17 00:00:00 2001 From: aokad Date: Fri, 9 Aug 2019 15:23:24 +0900 Subject: [PATCH] update --- scripts/ecsub/submit.py | 179 +++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 87 deletions(-) diff --git a/scripts/ecsub/submit.py b/scripts/ecsub/submit.py index d374cdb..bcffe33 100644 --- a/scripts/ecsub/submit.py +++ b/scripts/ecsub/submit.py @@ -558,106 +558,111 @@ def terminate_thread(thread): def main(params): - # set cluster_name - params["cluster_name"] = params["task_name"] - if params["cluster_name"] == "": - params["cluster_name"] = os.path.splitext(os.path.basename(params["tasks"]))[0] \ - + '-' \ - + ''.join([random.choice(string.ascii_letters + string.digits) for i in range(5)]) + try: + # set cluster_name + params["cluster_name"] = params["task_name"] + if params["cluster_name"] == "": + params["cluster_name"] = os.path.splitext(os.path.basename(params["tasks"]))[0] \ + + '-' \ + + ''.join([random.choice(string.ascii_letters + string.digits) for i in range(5)]) + + # check param + instance_type_list = params["aws_ec2_instance_type_list"].replace(" ", "") + if len(instance_type_list) == 0: + params["aws_ec2_instance_type_list"] = [params["aws_ec2_instance_type"]] + else: + params["aws_ec2_instance_type_list"] = instance_type_list.split(",") - # check param - instance_type_list = params["aws_ec2_instance_type_list"].replace(" ", "") - if len(instance_type_list) == 0: - params["aws_ec2_instance_type_list"] = [params["aws_ec2_instance_type"]] - else: - params["aws_ec2_instance_type_list"] = instance_type_list.split(",") + if params["aws_ec2_instance_type"] != "": + pass + + elif len(params["aws_ec2_instance_type_list"]) > 0: + if not params["spot"]: + print (ecsub.tools.error_message (params["cluster_name"], None, "--aws-ec2-instance-type-list option is not support with ondemand-instance mode.")) + return 1 + + else: + print (ecsub.tools.error_message (params["cluster_name"], None, "One of --aws-ec2-instance-type option and --aws-ec2-instance-type-list option is required.")) + return 1 - if params["aws_ec2_instance_type"] != "": - pass + # "request_payer_bucket": + request_payer_bucket = params["request_payer_bucket"].replace(" ", "") + if len(request_payer_bucket) == 0: + params["request_payer_bucket"] = [] + else: + params["request_payer_bucket"] = request_payer_bucket.split(",") - elif len(params["aws_ec2_instance_type_list"]) > 0: - if not params["spot"]: - print (ecsub.tools.error_message (params["cluster_name"], None, "--aws-ec2-instance-type-list option is not support with ondemand-instance mode.")) + # read tasks file + task_params = read_tasksfile(params["tasks"], params["cluster_name"]) + if task_params == None: + #print (ecsub.tools.error_message (params["cluster_name"], None, "task file is invalid.")) return 1 - else: - print (ecsub.tools.error_message (params["cluster_name"], None, "One of --aws-ec2-instance-type option and --aws-ec2-instance-type-list option is required.")) - return 1 - - # "request_payer_bucket": - request_payer_bucket = params["request_payer_bucket"].replace(" ", "") - if len(request_payer_bucket) == 0: - params["request_payer_bucket"] = [] - else: - params["request_payer_bucket"] = request_payer_bucket.split(",") + if task_params["tasks"] == []: + print (ecsub.tools.info_message (params["cluster_name"], None, "task file is empty.")) + return 0 - # read tasks file - task_params = read_tasksfile(params["tasks"], params["cluster_name"]) - if task_params == None: - #print (ecsub.tools.error_message (params["cluster_name"], None, "task file is invalid.")) - return 1 - - if task_params["tasks"] == []: - print (ecsub.tools.info_message (params["cluster_name"], None, "task file is empty.")) - return 0 - - subdir = params["cluster_name"] - - params["wdir"] = params["wdir"].rstrip("/") + "/" + subdir - params["aws_s3_bucket"] = params["aws_s3_bucket"].rstrip("/") + "/" + subdir - - if os.path.exists (params["wdir"]): - shutil.rmtree(params["wdir"]) - print (ecsub.tools.info_message (params["cluster_name"], None, "'%s' existing directory was deleted." % (params["wdir"]))) + subdir = params["cluster_name"] - os.makedirs(params["wdir"]) - os.makedirs(params["wdir"] + "/log") - os.makedirs(params["wdir"] + "/conf") - os.makedirs(params["wdir"] + "/script") - - # disk-size - if params["disk_size"] < 1: - print (ecsub.tools.error_message (params["cluster_name"], None, "disk-size %d is smaller than expected size 1GB." % (params["disk_size"]))) - return 1 + params["wdir"] = params["wdir"].rstrip("/") + "/" + subdir + params["aws_s3_bucket"] = params["aws_s3_bucket"].rstrip("/") + "/" + subdir - aws_instance = ecsub.aws.Aws_ecsub_control(params, len(task_params["tasks"])) - - # check task-param - if not aws_instance.check_awsconfigure(): - return 1 + if os.path.exists (params["wdir"]): + shutil.rmtree(params["wdir"]) + print (ecsub.tools.info_message (params["cluster_name"], None, "'%s' existing directory was deleted." % (params["wdir"]))) + + os.makedirs(params["wdir"]) + os.makedirs(params["wdir"] + "/log") + os.makedirs(params["wdir"] + "/conf") + os.makedirs(params["wdir"] + "/script") + + # disk-size + if params["disk_size"] < 1: + print (ecsub.tools.error_message (params["cluster_name"], None, "disk-size %d is smaller than expected size 1GB." % (params["disk_size"]))) + return 1 + + aws_instance = ecsub.aws.Aws_ecsub_control(params, len(task_params["tasks"])) + + # check task-param + if not aws_instance.check_awsconfigure(): + return 1 - # check s3-files path - (regions, invalid_pathes) = check_inputfiles(aws_instance, task_params, params["cluster_name"], params["request_payer_bucket"], params["aws_s3_bucket"]) - if len(regions) > 1: - if params["ignore_location"]: - print (ecsub.tools.warning_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions)))) - else: - print (ecsub.tools.error_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions)))) + # check s3-files path + (regions, invalid_pathes) = check_inputfiles(aws_instance, task_params, params["cluster_name"], params["request_payer_bucket"], params["aws_s3_bucket"]) + if len(regions) > 1: + if params["ignore_location"]: + print (ecsub.tools.warning_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions)))) + else: + print (ecsub.tools.error_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions)))) + return 1 + + for r in invalid_pathes: + print (ecsub.tools.error_message (params["cluster_name"], None, "input '%s' is not access." % (r))) + if len(invalid_pathes)> 0: return 1 - for r in invalid_pathes: - print (ecsub.tools.error_message (params["cluster_name"], None, "input '%s' is not access." % (r))) - if len(invalid_pathes)> 0: - return 1 - - # write task-scripts, and upload to S3 - local_script_dir = params["wdir"] + "/script" - s3_script_dir = params["aws_s3_bucket"].rstrip("/") + "/script" - if not upload_scripts(task_params, - aws_instance, - local_script_dir, - s3_script_dir, - params["script"], - params["cluster_name"], - params["shell"], - params["request_payer_bucket"]): - print (ecsub.tools.error_message (params["cluster_name"], None, "failure upload files to s3 bucket: %s." % (params["aws_s3_bucket"]))) + # write task-scripts, and upload to S3 + local_script_dir = params["wdir"] + "/script" + s3_script_dir = params["aws_s3_bucket"].rstrip("/") + "/script" + if not upload_scripts(task_params, + aws_instance, + local_script_dir, + s3_script_dir, + params["script"], + params["cluster_name"], + params["shell"], + params["request_payer_bucket"]): + print (ecsub.tools.error_message (params["cluster_name"], None, "failure upload files to s3 bucket: %s." % (params["aws_s3_bucket"]))) + return 1 + + # Ebs Price + if not aws_instance.set_ebs_price (): + return 1 + + except KeyboardInterrupt: + print ("KeyboardInterrupt") return 1 - # Ebs Price - if not aws_instance.set_ebs_price (): - return 1 - # run purocesses thread_list = [] ctx = {}