Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
aokad committed Aug 9, 2019
1 parent f2eed7d commit 7998f6b
Showing 1 changed file with 92 additions and 87 deletions.
179 changes: 92 additions & 87 deletions scripts/ecsub/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,106 +558,111 @@ def terminate_thread(thread):

def main(params):

# set cluster_name
params["cluster_name"] = params["task_name"]
if params["cluster_name"] == "":
params["cluster_name"] = os.path.splitext(os.path.basename(params["tasks"]))[0] \
+ '-' \
+ ''.join([random.choice(string.ascii_letters + string.digits) for i in range(5)])
try:
# set cluster_name
params["cluster_name"] = params["task_name"]
if params["cluster_name"] == "":
params["cluster_name"] = os.path.splitext(os.path.basename(params["tasks"]))[0] \
+ '-' \
+ ''.join([random.choice(string.ascii_letters + string.digits) for i in range(5)])

# check param
instance_type_list = params["aws_ec2_instance_type_list"].replace(" ", "")
if len(instance_type_list) == 0:
params["aws_ec2_instance_type_list"] = [params["aws_ec2_instance_type"]]
else:
params["aws_ec2_instance_type_list"] = instance_type_list.split(",")

# check param
instance_type_list = params["aws_ec2_instance_type_list"].replace(" ", "")
if len(instance_type_list) == 0:
params["aws_ec2_instance_type_list"] = [params["aws_ec2_instance_type"]]
else:
params["aws_ec2_instance_type_list"] = instance_type_list.split(",")
if params["aws_ec2_instance_type"] != "":
pass

elif len(params["aws_ec2_instance_type_list"]) > 0:
if not params["spot"]:
print (ecsub.tools.error_message (params["cluster_name"], None, "--aws-ec2-instance-type-list option is not support with ondemand-instance mode."))
return 1

else:
print (ecsub.tools.error_message (params["cluster_name"], None, "One of --aws-ec2-instance-type option and --aws-ec2-instance-type-list option is required."))
return 1

if params["aws_ec2_instance_type"] != "":
pass
# "request_payer_bucket":
request_payer_bucket = params["request_payer_bucket"].replace(" ", "")
if len(request_payer_bucket) == 0:
params["request_payer_bucket"] = []
else:
params["request_payer_bucket"] = request_payer_bucket.split(",")

elif len(params["aws_ec2_instance_type_list"]) > 0:
if not params["spot"]:
print (ecsub.tools.error_message (params["cluster_name"], None, "--aws-ec2-instance-type-list option is not support with ondemand-instance mode."))
# read tasks file
task_params = read_tasksfile(params["tasks"], params["cluster_name"])
if task_params == None:
#print (ecsub.tools.error_message (params["cluster_name"], None, "task file is invalid."))
return 1

else:
print (ecsub.tools.error_message (params["cluster_name"], None, "One of --aws-ec2-instance-type option and --aws-ec2-instance-type-list option is required."))
return 1

# "request_payer_bucket":
request_payer_bucket = params["request_payer_bucket"].replace(" ", "")
if len(request_payer_bucket) == 0:
params["request_payer_bucket"] = []
else:
params["request_payer_bucket"] = request_payer_bucket.split(",")
if task_params["tasks"] == []:
print (ecsub.tools.info_message (params["cluster_name"], None, "task file is empty."))
return 0

# read tasks file
task_params = read_tasksfile(params["tasks"], params["cluster_name"])
if task_params == None:
#print (ecsub.tools.error_message (params["cluster_name"], None, "task file is invalid."))
return 1

if task_params["tasks"] == []:
print (ecsub.tools.info_message (params["cluster_name"], None, "task file is empty."))
return 0

subdir = params["cluster_name"]

params["wdir"] = params["wdir"].rstrip("/") + "/" + subdir
params["aws_s3_bucket"] = params["aws_s3_bucket"].rstrip("/") + "/" + subdir

if os.path.exists (params["wdir"]):
shutil.rmtree(params["wdir"])
print (ecsub.tools.info_message (params["cluster_name"], None, "'%s' existing directory was deleted." % (params["wdir"])))
subdir = params["cluster_name"]

os.makedirs(params["wdir"])
os.makedirs(params["wdir"] + "/log")
os.makedirs(params["wdir"] + "/conf")
os.makedirs(params["wdir"] + "/script")

# disk-size
if params["disk_size"] < 1:
print (ecsub.tools.error_message (params["cluster_name"], None, "disk-size %d is smaller than expected size 1GB." % (params["disk_size"])))
return 1
params["wdir"] = params["wdir"].rstrip("/") + "/" + subdir
params["aws_s3_bucket"] = params["aws_s3_bucket"].rstrip("/") + "/" + subdir

aws_instance = ecsub.aws.Aws_ecsub_control(params, len(task_params["tasks"]))

# check task-param
if not aws_instance.check_awsconfigure():
return 1
if os.path.exists (params["wdir"]):
shutil.rmtree(params["wdir"])
print (ecsub.tools.info_message (params["cluster_name"], None, "'%s' existing directory was deleted." % (params["wdir"])))

os.makedirs(params["wdir"])
os.makedirs(params["wdir"] + "/log")
os.makedirs(params["wdir"] + "/conf")
os.makedirs(params["wdir"] + "/script")

# disk-size
if params["disk_size"] < 1:
print (ecsub.tools.error_message (params["cluster_name"], None, "disk-size %d is smaller than expected size 1GB." % (params["disk_size"])))
return 1

aws_instance = ecsub.aws.Aws_ecsub_control(params, len(task_params["tasks"]))

# check task-param
if not aws_instance.check_awsconfigure():
return 1

# check s3-files path
(regions, invalid_pathes) = check_inputfiles(aws_instance, task_params, params["cluster_name"], params["request_payer_bucket"], params["aws_s3_bucket"])
if len(regions) > 1:
if params["ignore_location"]:
print (ecsub.tools.warning_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions))))
else:
print (ecsub.tools.error_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions))))
# check s3-files path
(regions, invalid_pathes) = check_inputfiles(aws_instance, task_params, params["cluster_name"], params["request_payer_bucket"], params["aws_s3_bucket"])
if len(regions) > 1:
if params["ignore_location"]:
print (ecsub.tools.warning_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions))))
else:
print (ecsub.tools.error_message (params["cluster_name"], None, "your task uses multipule regions '%s'." % (",".join(regions))))
return 1

for r in invalid_pathes:
print (ecsub.tools.error_message (params["cluster_name"], None, "input '%s' is not access." % (r)))
if len(invalid_pathes)> 0:
return 1

for r in invalid_pathes:
print (ecsub.tools.error_message (params["cluster_name"], None, "input '%s' is not access." % (r)))
if len(invalid_pathes)> 0:
return 1

# write task-scripts, and upload to S3
local_script_dir = params["wdir"] + "/script"
s3_script_dir = params["aws_s3_bucket"].rstrip("/") + "/script"
if not upload_scripts(task_params,
aws_instance,
local_script_dir,
s3_script_dir,
params["script"],
params["cluster_name"],
params["shell"],
params["request_payer_bucket"]):
print (ecsub.tools.error_message (params["cluster_name"], None, "failure upload files to s3 bucket: %s." % (params["aws_s3_bucket"])))
# write task-scripts, and upload to S3
local_script_dir = params["wdir"] + "/script"
s3_script_dir = params["aws_s3_bucket"].rstrip("/") + "/script"
if not upload_scripts(task_params,
aws_instance,
local_script_dir,
s3_script_dir,
params["script"],
params["cluster_name"],
params["shell"],
params["request_payer_bucket"]):
print (ecsub.tools.error_message (params["cluster_name"], None, "failure upload files to s3 bucket: %s." % (params["aws_s3_bucket"])))
return 1

# Ebs Price
if not aws_instance.set_ebs_price ():
return 1

except KeyboardInterrupt:
print ("KeyboardInterrupt")
return 1

# Ebs Price
if not aws_instance.set_ebs_price ():
return 1

# run purocesses
thread_list = []
ctx = {}
Expand Down

0 comments on commit 7998f6b

Please sign in to comment.