From 5581d9f3d92eccf0f85ff160e7d5d0ea352e14ad Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Mon, 10 Aug 2020 14:13:53 -0700 Subject: [PATCH 1/2] Adding pagination in list_Training_jobs --- smdebug/rules/action/stop_training_action.py | 71 +++++++++++++------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/smdebug/rules/action/stop_training_action.py b/smdebug/rules/action/stop_training_action.py index 53f1b5fdc..a0ababd33 100644 --- a/smdebug/rules/action/stop_training_action.py +++ b/smdebug/rules/action/stop_training_action.py @@ -20,31 +20,56 @@ def __init__(self, rule_name, training_job_prefix): self._rule_name = rule_name self._found_jobs = self._get_sm_tj_jobs_with_prefix() - def _get_sm_tj_jobs_with_prefix(self): - found_jobs = [] - try: - jobs = self._sm_client.list_training_jobs() - if "TrainingJobSummaries" in jobs: - jobs = jobs["TrainingJobSummaries"] - else: - self._logger.info( - f"No TrainingJob summaries found: list_training_jobs output is : {jobs}" - ) - return - for job in jobs: + def _get_sm_tj_jobs_with_prefix(self, status=["InProgress"]): + res = {} + found_job_dict = {} + next_token = None + name = self._training_job_prefix + i = 0 + while i < 50: + try: + if next_token is None: + res = self._sm_client.list_training_jobs( + NameContains=name, + SortBy="CreationTime", + SortOrder="Descending", + StatusEquals="InProgress", + ) + else: + res = self._sm_client.list_training_jobs( + NextToken=next_token, + NameContains=name, + SortBy="CreationTime", + SortOrder="Descending", + StatusEquals="InProgress", + ) + if "TrainingJobSummaries" in res: + jobs = res["TrainingJobSummaries"] + else: + self._logger.info( + f"No TrainingJob summaries found: list_training_jobs output is : {res}" + ) + return + for job in jobs: + tj_status = job["TrainingJobStatus"] + tj_name = job["TrainingJobName"] + self._logger.info(f"TrainingJob name: {tj_name} , status:{tj_status}") + if tj_name is not None and tj_name.startswith(name): + found_job_dict[tj_name] = 1 + self._logger.info(f"found_training job {found_job_dict.keys()}") + except Exception as e: self._logger.info( - f"TrainingJob name: {job['TrainingJobName']} , status:{job['TrainingJobStatus']}" + f"Caught exception while getting list_training_job exception is: \n {e}. Attempt:{i}" ) - if job["TrainingJobName"] is not None and job["TrainingJobName"].startswith( - self._training_job_prefix - ): - found_jobs.append(job["TrainingJobName"]) - self._logger.info(f"found_training job {found_jobs}") - except Exception as e: - self._logger.info( - f"Caught exception while getting list_training_job exception is: \n {e}" - ) - return found_jobs + if "NextToken" not in jobs: + break + else: + next_token = jobs["NextToken"] + res = {} + jobs = {} + i += 1 + + return found_job_dict.keys() def _stop_training_job(self): if len(self._found_jobs) != 1: From 90b57ddaf8077f74a426901e1de30048e47e1fb8 Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Mon, 10 Aug 2020 15:23:09 -0700 Subject: [PATCH 2/2] fix --- smdebug/rules/action/stop_training_action.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/smdebug/rules/action/stop_training_action.py b/smdebug/rules/action/stop_training_action.py index a0ababd33..e665ed041 100644 --- a/smdebug/rules/action/stop_training_action.py +++ b/smdebug/rules/action/stop_training_action.py @@ -20,7 +20,7 @@ def __init__(self, rule_name, training_job_prefix): self._rule_name = rule_name self._found_jobs = self._get_sm_tj_jobs_with_prefix() - def _get_sm_tj_jobs_with_prefix(self, status=["InProgress"]): + def _get_sm_tj_jobs_with_prefix(self): res = {} found_job_dict = {} next_token = None @@ -61,10 +61,10 @@ def _get_sm_tj_jobs_with_prefix(self, status=["InProgress"]): self._logger.info( f"Caught exception while getting list_training_job exception is: \n {e}. Attempt:{i}" ) - if "NextToken" not in jobs: + if "NextToken" not in res: break else: - next_token = jobs["NextToken"] + next_token = res["NextToken"] res = {} jobs = {} i += 1