diff --git a/.travis.yml b/.travis.yml index 892b4a8..4a06dc4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,11 +37,11 @@ before_script: script: - pip install awscli boto3 - python setup.py build install - - if [ ${SUBMIT} == 1 ]; then python setup.py test; fi + - if [ ${SUBMIT} == 1 ]; then python -m unittest discover -s ./tests -p unit_tests.py; fi - if [ ${SUBMIT} == 0 ]; then python -m unittest discover -s ./tests -p unit_tests_short.py; fi notifications: emails: - - aiokada@hgc.jp + - aokada@ncc.go.jp on_success: change on_failure: always diff --git a/ecsub b/ecsub index 09daf96..6cfb948 100644 --- a/ecsub +++ b/ecsub @@ -57,7 +57,7 @@ def main(): submit_parser.add_argument("--ignore-location", help = "Ignore differences in location", action = 'store_true') submit_parser.add_argument("--not-verify-bucket", help = "Do not verify input pathes", action = 'store_true') submit_parser.add_argument("--skip-price", help = "Skip get pricing from aws", action = 'store_true') - + submit_parser.add_argument("--waiter-delay", metavar = 15, help = "The amount of time(sec) to boto3 waiter", type = int, default = default.waiter_delay) return submit_parser ########## diff --git a/scripts/ecsub/__init__.py b/scripts/ecsub/__init__.py index 40428ef..2068bc3 100644 --- a/scripts/ecsub/__init__.py +++ b/scripts/ecsub/__init__.py @@ -1 +1 @@ -__version__ = '0.0.25' \ No newline at end of file +__version__ = '0.0.26' \ No newline at end of file diff --git a/scripts/ecsub/aws.py b/scripts/ecsub/aws.py index 9756a74..74ec29d 100644 --- a/scripts/ecsub/aws.py +++ b/scripts/ecsub/aws.py @@ -17,6 +17,7 @@ import ecsub.tools import glob import base64 +import pprint class Aws_ecsub_control: @@ -56,7 +57,7 @@ def __init__(self, params, task_num): self.aws_subnet_id = params["aws_subnet_id"] self.image = params["image"] self.use_amazon_ecr = params["use_amazon_ecr"] - + self.waiter_delay = params["waiter_delay"] self.task_definition_arn = "" self.cluster_arn = "" @@ -547,27 +548,27 @@ def _getblock_device_mappings(self): def _wait_run_instance(self, instance_id, no): + response = boto3.client("ec2").describe_instance_status(InstanceIds=[instance_id]) + max_attempts = int(1800/self.waiter_delay) + print_interval = int(600/self.waiter_delay) + wait_counter = print_interval if instance_id != "": - cmd_template = "{setx}; aws ec2 wait instance-running --instance-ids {INSTANCE_ID}" - cmd = cmd_template.format( - setx = self.setx, - INSTANCE_ID = instance_id - ) - self._subprocess_call(cmd, no) - - cmd_template = "{setx}; aws ec2 wait instance-status-ok --include-all-instances --instance-ids {INSTANCE_ID}" - cmd = cmd_template.format( - setx = self.setx, - INSTANCE_ID = instance_id - ) - self._subprocess_call(cmd, no) - - for i in range(3): + for i in range(max_attempts): + try: + if response['InstanceStatuses'][0]['InstanceState']['Name'] == "running" \ + and response['InstanceStatuses'][0]['InstanceStatus']['Status'] == "ok": + return True + except Exception: + pass + + if wait_counter == print_interval: + print(ecsub.tools.info_message (self.cluster_name, no, "wait instance-status-ok instance-ids=%s" % (instance_id))) + wait_counter = 0 + + time.sleep(self.waiter_delay) response = boto3.client("ec2").describe_instance_status(InstanceIds=[instance_id]) - if response['InstanceStatuses'][0]['InstanceStatus']['Status'] == "ok": - return True - self._subprocess_call(cmd, no) - + wait_counter += 1 + print(ecsub.tools.error_message (self.cluster_name, no, "Failure run instance.")) return False @@ -896,12 +897,18 @@ def run_instances_spot (self, no): return instance_id elif state == "open" and status_code == 'pending-evaluation': - cmd_template = "{setx}; aws ec2 wait spot-instance-request-fulfilled --spot-instance-request-ids {REQUEST_ID}" - cmd = cmd_template.format( - setx = self.setx, - REQUEST_ID = request_id - ) - self._subprocess_call(cmd, no) + max_attempts = int(600/self.waiter_delay) + print_interval = int(600/self.waiter_delay) + wait_counter = print_interval + for j in range(max_attempts): + if wait_counter == print_interval: + print(ecsub.tools.info_message (self.cluster_name, no, "spot-instance-request-fulfilled spot-instance-request-ids=%s" % (request_id))) + wait_counter = 0 + time.sleep(self.waiter_delay) + response2 = self._describe_spot_instances(no, request_id = request_id) + wait_counter += 1 + if response2['Status']['Code'] == "fulfilled": + break else: print(ecsub.tools.error_message (self.cluster_name, no, "Failure request-spot-instances. [Status] %s [Code] %s [Message] %s" % (response['State'], @@ -1122,31 +1129,32 @@ def run_task (self, no, instance_id): return (0, None) # wait to task-stop - cmd_template = "{setx};aws ecs wait tasks-stopped --tasks {TASK_ARN} --cluster {CLUSTER_ARN}" - - cmd = cmd_template.format( - setx = self.setx, - CLUSTER_ARN = self.cluster_arn, - TASK_ARN = task_arn - ) - self._subprocess_call(cmd, no) - + #print(ecsub.tools.info_message (self.cluster_name, no, "wait tasks-stopped task=%s" % (task_arn))) response = boto3.client('ecs').describe_tasks( cluster=self.cluster_arn, tasks=[task_arn] ) + print_interval = int(600/self.waiter_delay) + wait_counter = print_interval while True: - if len(response["tasks"]) == 0: - return (exit_code, log_file) - if response["tasks"][0]['lastStatus'] != "RUNNING": - break + try: + if response["tasks"][0]['lastStatus'] == 'STOPPED': + break + except Exception: + pprint.pprint(response["tasks"]) + pass - self._subprocess_call(cmd, no) + if wait_counter == print_interval: + print(ecsub.tools.info_message (self.cluster_name, no, "wait tasks-stopped task=%s" % (task_arn))) + wait_counter = 0 + + time.sleep(self.waiter_delay) response = boto3.client('ecs').describe_tasks( cluster=self.cluster_arn, tasks=[task_arn] ) - + wait_counter += 1 + # check exit code log_file = self._log_path("describe-tasks.%03d" % (no)) @@ -1200,17 +1208,39 @@ def terminate_instances (self, instance_id, no = None): if no != None: log_file = self._log_path("terminate-instances.%03d" % (no)) + if instance_id == "": + return + cmd_template = "{setx};" \ - + "aws ec2 terminate-instances --instance-ids {ec2InstanceId} > {log};" \ - + "aws ec2 wait instance-terminated --instance-ids {ec2InstanceId}" - + + "aws ec2 terminate-instances --instance-ids {ec2InstanceId} > {log}" cmd = cmd_template.format( setx = self.setx, log = log_file, ec2InstanceId = instance_id ) self._subprocess_call(cmd, no) - + instance_id_list = instance_id.split(" ") + if len(instance_id_list) > 1: + return + + response = boto3.client("ec2").describe_instance_status(InstanceIds=instance_id_list) + max_attempts = int(600/self.waiter_delay) + print_interval = int(600/self.waiter_delay) + wait_counter = print_interval + for i in range(max_attempts): + try: + if response['InstanceStatuses'][0]['InstanceState']['Name'] == "terminated": + break + except Exception: + break + + if wait_counter == print_interval: + print(ecsub.tools.info_message (self.cluster_name, no, "wait terminated instance-ids=%s" % (instance_id))) + wait_counter = 0 + time.sleep(self.waiter_delay) + response = boto3.client("ec2").describe_instance_status(InstanceIds=instance_id_list) + wait_counter += 1 + def cancel_spot_instance_requests (self, no = None, instance_id = None, spot_req_id = None): log_file = self._log_path("cancel-spot-instance-requests") @@ -1254,8 +1284,10 @@ def clean_up (self): except Exception: pass - if len(instance_ids) > 0: - self.terminate_instances (" ".join(instance_ids)) + STEP = 100 + instance_ids = list(set(instance_ids)) + for i in range(0, len(instance_ids), STEP): + self.terminate_instances (" ".join(instance_ids[i:i+STEP])) # cancel_spot_instance_requests req_ids = [] @@ -1265,9 +1297,10 @@ def clean_up (self): req_ids.append(log["SpotInstanceRequests"][0]["SpotInstanceRequestId"]) except Exception: pass - - if len(req_ids) > 0: - self.cancel_spot_instance_requests (spot_req_id = " ".join(req_ids)) + + req_ids = list(set(req_ids)) + for i in range(0, len(req_ids), STEP): + self.cancel_spot_instance_requests (spot_req_id = " ".join(req_ids[i:i+STEP])) # delete cluster if self.cluster_arn != "": diff --git a/scripts/ecsub/submit.py b/scripts/ecsub/submit.py index 3c243f5..60bd0f3 100644 --- a/scripts/ecsub/submit.py +++ b/scripts/ecsub/submit.py @@ -373,6 +373,7 @@ def _run_task(aws_instance, no, instance_id): try: (exit_code, task_log) = aws_instance.run_task(no, instance_id) + #print([exit_code, task_log]) #if exit_code == 127: # system_error = True @@ -861,6 +862,7 @@ def __init__(self): self.ignore_location = False self.not_verify_bucket = False self.skip_price = False + self.waiter_delay = 15 # The followings are not optional self.setx = "set -x" diff --git a/setup.py b/setup.py index 64550ae..5e59f3e 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,5 @@ # -*- Entry points: -*- """, package_data = { - }, - test_suite = 'unit_tests.suite' + } ) diff --git a/tests/test-wordcount3.tsv b/tests/test-wordcount3.tsv new file mode 100644 index 0000000..e5cfc08 --- /dev/null +++ b/tests/test-wordcount3.tsv @@ -0,0 +1,8 @@ +# comment +# comment +# comment + +--secret-env SECRET_NAME --env NAME --input INPUT_FILE --input-recursive SCRIPT --output OUTPUT_FILE +AQICAHiNgk2UbjadF957JEIEUfjZQBC+acKraTS+ce11+vjlvgESKnDQ3IPSgLhOcM7OLVINAAAA5jCB4wYJKoZIhvcNAQcGoIHVMIHSAgEAMIHMBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDPjR2P0YddOyp2oFVAIBEICBnrXFE6AKkrMCQuMy6cAZTtwCsGAIPJfHOZzlbzXOilYZRRivMa/rOKX2XTHRH/UJqsyrKO+EjgdOUOrI6eGCX0jMj+Isg1bgdNcUDO01hxHkYs0tO753Qz/YiRFX6kIlNcyKwRSL2AK3gGgrM2FrfeDlz/zVQVS0q0o0oRuGkOZFZ8RbmnhOki9pjBgPYuNy+ooT4RArHUitG4hShMJ2 The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog s3://travisci-work/wordcount/input/hamlet.txt s3://travisci-work/wordcount/python s3://travisci-work/wordcount/output/hamlet-count.txt +AQICAHiNgk2UbjadF957JEIEUfjZQBC+acKraTS+ce11+vjlvgESKnDQ3IPSgLhOcM7OLVINAAAA5jCB4wYJKoZIhvcNAQcGoIHVMIHSAgEAMIHMBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDPjR2P0YddOyp2oFVAIBEICBnrXFE6AKkrMCQuMy6cAZTtwCsGAIPJfHOZzlbzXOilYZRRivMa/rOKX2XTHRH/UJqsyrKO+EjgdOUOrI6eGCX0jMj+Isg1bgdNcUDO01hxHkYs0tO753Qz/YiRFX6kIlNcyKwRSL2AK3gGgrM2FrfeDlz/zVQVS0q0o0oRuGkOZFZ8RbmnhOki9pjBgPYuNy+ooT4RArHUitG4hShMJ2 The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog s3://travisci-work/wordcount/input/hamlet.txt s3://travisci-work/wordcount/python s3://travisci-work/wordcount/output/hamlet-count.txt +AQICAHiNgk2UbjadF957JEIEUfjZQBC+acKraTS+ce11+vjlvgESKnDQ3IPSgLhOcM7OLVINAAAA5jCB4wYJKoZIhvcNAQcGoIHVMIHSAgEAMIHMBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDPjR2P0YddOyp2oFVAIBEICBnrXFE6AKkrMCQuMy6cAZTtwCsGAIPJfHOZzlbzXOilYZRRivMa/rOKX2XTHRH/UJqsyrKO+EjgdOUOrI6eGCX0jMj+Isg1bgdNcUDO01hxHkYs0tO753Qz/YiRFX6kIlNcyKwRSL2AK3gGgrM2FrfeDlz/zVQVS0q0o0oRuGkOZFZ8RbmnhOki9pjBgPYuNy+ooT4RArHUitG4hShMJ2 The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog_The_quick_brown_fox_jumps_over_the_lazy_dog s3://travisci-work/wordcount/input/hamlet.txt s3://travisci-work/wordcount/python s3://travisci-work/wordcount/output/hamlet-count.txt diff --git a/tests/unit_tests.py b/tests/unit_tests.py index 71c2652..8aa43ae 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -59,7 +59,7 @@ def test2_02_submit(self): "--image", "python:2-alpine3.6", "--shell", "ash", "--script", "./tests/run-wordcount.sh", - "--tasks", "./tests/test-wordcount.tsv", + "--tasks", "./tests/test-wordcount3.tsv", "--aws-ec2-instance-type", "t2.micro", "--disk-size", "1", "--aws-s3-bucket", "s3://travisci-work/wordcount/output/",