diff --git a/ert_gui/simulation/progress.py b/ert_gui/simulation/progress.py index 4062a0c07d9..ae329efa105 100644 --- a/ert_gui/simulation/progress.py +++ b/ert_gui/simulation/progress.py @@ -66,10 +66,12 @@ def setIndeterminate(self, indeterminate): else: self.__timer.stop() + def get_indeterminate(self): + return self.__indeterminate + def setIndeterminateColor(self, color): self.__indeterminate_color = color - def paintEvent(self, paint_event): QFrame.paintEvent(self, paint_event) painter = QPainter(self) diff --git a/ert_gui/simulation/run_dialog.py b/ert_gui/simulation/run_dialog.py index e38da9951c6..662a01a614f 100644 --- a/ert_gui/simulation/run_dialog.py +++ b/ert_gui/simulation/run_dialog.py @@ -1,43 +1,26 @@ from threading import Thread -import sys - -try: - from PyQt4.QtCore import Qt, QTimer, QSize - from PyQt4.QtGui import (QDialog, - QVBoxLayout, - QLayout, - QMessageBox, - QPushButton, - QHBoxLayout, - QColor, - QLabel, - QListView, - QStandardItemModel, - QStandardItem, - QWidget) -except ImportError: - from PyQt5.QtCore import Qt, QTimer, QSize - from PyQt5.QtWidgets import (QDialog, - QVBoxLayout, - QLayout, - QMessageBox, - QPushButton, - QHBoxLayout, - QLabel, - QListView, - QWidget) - from PyQt5.QtGui import QColor, QStandardItemModel, QStandardItem - -from ert_gui.ertwidgets import resourceMovie, Legend -from ert_gui.simulation import Progress, SimpleProgress, DetailedProgressWidget -from ert_shared.models import BaseRunModel, SimulationsTracker + +from ecl.util.util import BoolVector +from ert_gui.ertwidgets import Legend, resourceMovie +from ert_gui.simulation import DetailedProgressWidget, Progress, SimpleProgress from ert_gui.tools.plot.plot_tool import PlotTool +from ert_shared.models import BaseRunModel +from ert_shared.tracker.events import (DetailedEvent, EndEvent, GeneralEvent, + TickEvent) +from ert_shared.tracker.factory import create_tracker +from ert_shared.tracker.utils import format_running_time +from ErtQt.Qt import (QColor, QDialog, QHBoxLayout, QLabel, QLayout, QListView, + QMessageBox, QPushButton, QSize, QStackedWidget, + QStandardItem, QStandardItemModel, Qt, QTimer, + QToolButton, QVBoxLayout, QWidget, pyqtSignal, pyqtSlot) from res.job_queue import JobStatusType -from ecl.util.util import BoolVector + class RunDialog(QDialog): - def __init__(self, config_file, run_model, parent): + simulation_done = pyqtSignal(bool, str) + + def __init__(self, config_file, run_model, arguments, parent): QDialog.__init__(self, parent) self.setWindowFlags(Qt.Window) self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint) @@ -52,8 +35,14 @@ def __init__(self, config_file, run_model, parent): if isinstance(run_model, BaseRunModel): ert = run_model.ert() - self.simulations_tracker = SimulationsTracker(model=run_model) - states = self.simulations_tracker.getStates() + self._simulations_argments = arguments + + self.simulations_tracker = create_tracker( + run_model, qtimer_cls=QTimer, + event_handler=self._on_tracker_event, + num_realizations=arguments["active_realizations"].count()) + + states = self.simulations_tracker.get_states() self.state_colors = {state.name: state.color for state in states} self.state_colors['Success'] = self.state_colors["Finished"] self.state_colors['Failure'] = self.state_colors["Failed"] @@ -149,29 +138,24 @@ def __init__(self, config_file, run_model, parent): self.done_button.clicked.connect(self.accept) self.restart_button.clicked.connect(self.restart_failed_realizations) self.show_details_button.clicked.connect(self.toggle_detailed_progress) - - self.__updating = False - self.__update_queued = False - self.__simulation_started = False - - self.__update_timer = QTimer(self) - self.__update_timer.setInterval(500) - self.__update_timer.timeout.connect(self.updateRunStatus) - self._simulations_argments = {} + self.simulation_done.connect(self._on_simulation_done) def reject(self): return def closeEvent(self, QCloseEvent): - if not self.checkIfRunFinished(): - #Kill jobs if dialog is closed + self.simulations_tracker.stop() + if self._run_model.isFinished(): + self.simulation_done.emit(self._run_model.hasRunFailed(), + self._run_model.getFailMessage()) + else: + # Kill jobs if dialog is closed if self.killJobs() != QMessageBox.Yes: QCloseEvent.ignore() - def startSimulation(self, arguments): - - self._simulations_argments = arguments + def startSimulation(self): self._run_model.reset() + self.simulations_tracker.reset() def run(): self._run_model.startSimulations( self._simulations_argments ) @@ -181,55 +165,7 @@ def run(): simulation_thread.run = run simulation_thread.start() - self.__update_timer.start() - - - def checkIfRunFinished(self): - if self._run_model.isFinished(): - self.hideKillAndShowDone() - - if self._run_model.hasRunFailed(): - error = self._run_model.getFailMessage() - QMessageBox.critical(self, "Simulations failed!", "The simulation failed with the following error:\n\n%s" % error) - - return True - return False - - def updateProgress(self): - self.simulations_tracker._update() - - for state in self.simulations_tracker.getStates(): - self.progress.updateState(state.state, 100.0 * state.count / state.total_count) - self.legends[state].updateLegend(state.name, state.count, state.total_count) - - def updateRunStatus(self): - self.__status_label.setText(self._run_model.getPhaseName()) - - if self.checkIfRunFinished(): - self.total_progress.setProgress(self._run_model.getProgress()) - self.detailed_progress.set_progress(*self._run_model.getDetailedProgress()) - self.updateProgress() - return - - self.total_progress.setProgress(self._run_model.getProgress()) - - if self._run_model.isIndeterminate(): - self.progress.setIndeterminate(True) - states = self.simulations_tracker.getStates() - for state in states: - self.legends[state].updateLegend(state.name, 0, 0) - - else: - if self.detailed_progress and self.detailed_progress.isVisible(): - self.detailed_progress.set_progress(*self._run_model.getDetailedProgress()) - else: - self._run_model.updateDetailedProgress() #update information without rendering - - self.progress.setIndeterminate(False) - self.updateProgress() - - runtime = self._run_model.getRunningTime() - self.running_time.setText(SimulationsTracker.format_running_time(runtime)) + self.simulations_tracker.track() def killJobs(self): @@ -243,16 +179,46 @@ def killJobs(self): self.reject() return kill_job - - def hideKillAndShowDone(self): - self.__update_timer.stop() + @pyqtSlot(bool, str) + def _on_simulation_done(self, failed, failed_msg): + self.simulations_tracker.stop() self.processing_animation.hide() self.kill_button.setHidden(True) self.done_button.setHidden(False) - self.detailed_progress.set_progress(*self._run_model.getDetailedProgress()) - self.restart_button.setVisible(self.has_failed_realizations() ) + self.restart_button.setVisible(self.has_failed_realizations()) self.restart_button.setEnabled(self._run_model.support_restart) + if failed: + QMessageBox.critical(self, "Simulations failed!", + "The simulation failed with the following " + + "error:\n\n{}".format(failed_msg)) + + @pyqtSlot(object) + def _on_tracker_event(self, event): + if isinstance(event, TickEvent): + self.running_time.setText(format_running_time(event.runtime)) + + if isinstance(event, GeneralEvent): + self.total_progress.setProgress(event.progress) + self.progress.setIndeterminate(event.indeterminate) + + if event.indeterminate: + for state in event.sim_states: + self.legends[state].updateLegend(state.name, 0, 0) + else: + for state in event.sim_states: + self.progress.updateState( + state.state, 100.0 * state.count / state.total_count) + self.legends[state].updateLegend( + state.name, state.count, state.total_count) + + if isinstance(event, DetailedEvent): + if not self.progress.get_indeterminate(): + self.detailed_progress.set_progress(event.details, + event.iteration) + + if isinstance(event, EndEvent): + self.simulation_done.emit(event.failed, event.failed_msg) def has_failed_realizations(self): completed = self._run_model.completed_realizations_mask @@ -301,7 +267,7 @@ def restart_failed_realizations(self): self._simulations_argments['active_realizations'] = active_realizations self._simulations_argments['prev_successful_realizations'] = self._simulations_argments.get('prev_successful_realizations', 0) self._simulations_argments['prev_successful_realizations'] += self.count_successful_realizations() - self.startSimulation(self._simulations_argments) + self.startSimulation() diff --git a/ert_gui/simulation/simulation_panel.py b/ert_gui/simulation/simulation_panel.py index abffcfd4417..2bccbf18ace 100644 --- a/ert_gui/simulation/simulation_panel.py +++ b/ert_gui/simulation/simulation_panel.py @@ -111,8 +111,8 @@ def runSimulation(self): if start_simulations == QMessageBox.Yes: run_model = self.getCurrentSimulationModel() arguments = self.getSimulationArguments() - dialog = RunDialog(self._config_file, run_model(), self) - dialog.startSimulation( arguments ) + dialog = RunDialog(self._config_file, run_model(), arguments, self) + dialog.startSimulation() dialog.exec_() ERT.emitErtChange() # simulations may have added new cases. diff --git a/ert_shared/cli/main.py b/ert_shared/cli/main.py index 0c67c25724a..b72e292f043 100644 --- a/ert_shared/cli/main.py +++ b/ert_shared/cli/main.py @@ -9,7 +9,7 @@ from ert_shared.cli.monitor import Monitor from ert_shared.cli.notifier import ErtCliNotifier from ert_shared.cli.workflow import execute_workflow -from ert_shared.models import SimulationsTracker +from ert_shared.tracker.factory import create_tracker from res.enkf import EnKFMain, ResConfig @@ -39,7 +39,7 @@ def run_cli(args): ) thread.start() - tracker = SimulationsTracker(model=model) + tracker = create_tracker(model, tick_interval=0, detailed_interval=0) monitor = Monitor(color_always=args.color_always) try: diff --git a/ert_shared/cli/monitor.py b/ert_shared/cli/monitor.py index 2915c14a7e7..fed188e3d5d 100644 --- a/ert_shared/cli/monitor.py +++ b/ert_shared/cli/monitor.py @@ -6,7 +6,9 @@ from colors import color as ansi_color from console_progressbar import ProgressBar -from ert_shared.models import SimulationStateStatus, SimulationsTracker +from ert_shared.tracker.events import EndEvent, GeneralEvent +from ert_shared.tracker.state import SimulationStateStatus +from ert_shared.tracker.utils import format_running_time def _ansi_color(*args, **kwargs): @@ -46,14 +48,16 @@ def _no_color(self, *args, **kwargs): return args[0] def monitor(self, tracker): - for update in tracker.track(): - self._print_progress(update) - - self._print_result(tracker.run_failed, tracker.failed_message) - - def _get_legends(self, tracker): + for event in tracker.track(): + if isinstance(event, GeneralEvent): + self._print_progress(event) + if isinstance(event, EndEvent): + self._print_result(event.failed, event.failed_msg) + return + + def _get_legends(self, sim_states): legends = {} - for state in tracker.getStates(): + for state in sim_states: legends[state] = "{}{:10} {:>10}".format( self._colorize(self.dot, fg=state.color), state.name, "{}/{}".format(state.count, state.total_count) @@ -71,40 +75,35 @@ def _print_result(self, failed, failed_message): fg=SimulationStateStatus.COLOR_FINISHED), file=self._out) - def _print_progress(self, tracker): - """Print a progress based on the information on a SimulationTracker - instance @tracker.""" - if tracker.queue_size == 0: - # queue_size is 0, so no progress can be displayed + def _print_progress(self, event): + """Print a progress based on the information on a GeneralEvent.""" + if event.indeterminate: + # indeterminate, no progress to be shown return prefix = """ --> {phase_name} {current_phase}/{target}""".format( - phase_name=tracker.iteration_name, - current_phase=min(tracker.total_iterations, - tracker.current_iteration + 1), - target=tracker.total_iterations, + phase_name=event.phase_name, + current_phase=min(event.total_phases, + event.current_phase + 1), + target=event.total_phases, ) statuses = "" - done = 0 - legends = self._get_legends(tracker) - for state in tracker.getStates(): + legends = self._get_legends(event.sim_states) + for state in event.sim_states: statuses += " {}\n".format(legends[state]) - if state.name == "Finished": - done = state.count suffix = """{runtime} {statuses}""".format(statuses=statuses, - runtime=SimulationsTracker.format_running_time( - tracker.runtime),) + runtime=format_running_time(event.runtime)) pb = ProgressBar( - total=tracker.queue_size, prefix=prefix, suffix=suffix, decimals=0, + total=100, prefix=prefix, suffix=suffix, decimals=0, length=self.bar_length, fill=self.filled_bar_char, zfill=self.empty_bar_char, file=self._out ) - pb.print_progress_bar(done) + pb.print_progress_bar(event.progress * 100) diff --git a/ert_shared/models/__init__.py b/ert_shared/models/__init__.py index d0c2d1d7dea..e4ae75488bc 100644 --- a/ert_shared/models/__init__.py +++ b/ert_shared/models/__init__.py @@ -4,4 +4,3 @@ from .ensemble_smoother import EnsembleSmoother from .iterated_ensemble_smoother import IteratedEnsembleSmoother from .multiple_data_assimilation import MultipleDataAssimilation -from .simulations_tracker import SimulationsTracker, SimulationStateStatus diff --git a/ert_shared/models/base_run_model.py b/ert_shared/models/base_run_model.py index 3d6b99033fe..0cf44f1b096 100644 --- a/ert_shared/models/base_run_model.py +++ b/ert_shared/models/base_run_model.py @@ -35,7 +35,6 @@ def __init__(self, queue_config, phase_count=1): super(BaseRunModel, self).__init__() self._phase = 0 self._phase_count = phase_count - self._phase_update_count = 0 self._phase_name = "Starting..." self._job_start_time = 0 @@ -164,16 +163,12 @@ def setPhase(self, phase, phase_name, indeterminate=None): self._simulationEnded() self._phase = phase - self._phase_update_count = 0 + def stop_time(self): + return self._job_stop_time - def getRunningTime(self): - """ @rtype: float """ - if self._job_stop_time < self._job_start_time: - return time.time() - self._job_start_time - else: - return self._job_stop_time - self._job_start_time - + def start_time(self): + return self._job_start_time @job_queue(1) def getQueueSize(self): @@ -205,30 +200,6 @@ def isQueueRunning(self): """ @rtype: bool """ return self._job_queue.isRunning() - - def getProgress(self): - """ @rtype: float """ - if self.isFinished(): - current_progress = 1.0 - elif not self.isQueueRunning() and self._phase_update_count > 0: - current_progress = (self._phase + 1.0) / self._phase_count - else: - self._phase_update_count += 1 - queue_status = self.getQueueStatus() - queue_size = self.getQueueSize() - - done_state = JobStatusType.JOB_QUEUE_SUCCESS | JobStatusType.JOB_QUEUE_DONE - done_count = 0 - - for state in queue_status: - if state in done_state: - done_count += queue_status[state] - - phase_progress = float(done_count) / queue_size - current_progress = (self._phase + phase_progress) / self._phase_count - - return current_progress - @staticmethod def is_forward_model_finished(progress): return not (any((job.status != 'Success' for job in progress))) @@ -254,6 +225,13 @@ def updateDetailedProgress(self): if self._job_queue: status = self._job_queue.getJobStatus(queue_index) + if status in [ + JobStatusType.JOB_QUEUE_PENDING, + JobStatusType.JOB_QUEUE_SUBMITTED, + JobStatusType.JOB_QUEUE_WAITING + ]: + continue + fms = self.realization_progress[iteration].get(run_arg.iens, None) #Dont load from file if you are finished diff --git a/ert_shared/models/ensemble_experiment.py b/ert_shared/models/ensemble_experiment.py index 6ce3ae1ad24..e0848c23bef 100644 --- a/ert_shared/models/ensemble_experiment.py +++ b/ert_shared/models/ensemble_experiment.py @@ -55,6 +55,12 @@ def create_context(self, arguments): jobname_fmt, subst_list, itr) + + # Deleting a run_context removes the possibility to retrospectively + # determine detailed progress. Thus, before deletion, the detailed + # progress is stored. + self.updateDetailedProgress() + self._run_context = run_context self._last_run_iteration = run_context.get_iter() return run_context diff --git a/ert_shared/models/ensemble_smoother.py b/ert_shared/models/ensemble_smoother.py index dedb04be223..286c7e2e9f4 100644 --- a/ert_shared/models/ensemble_smoother.py +++ b/ert_shared/models/ensemble_smoother.py @@ -91,6 +91,11 @@ def create_context(self, arguments, prior_context = None): state = RealizationStateEnum.STATE_HAS_DATA | RealizationStateEnum.STATE_INITIALIZED mask = sim_fs.getStateMap().createMask(state) + # Deleting a run_context removes the possibility to retrospectively + # determine detailed progress. Thus, before deletion, the detailed + # progress is stored. + self.updateDetailedProgress() + run_context = ErtRunContext.ensemble_smoother( sim_fs, target_fs, mask, runpath_fmt, jobname_fmt, subst_list, itr) self._run_context = run_context self._last_run_iteration = run_context.get_iter() diff --git a/ert_shared/models/iterated_ensemble_smoother.py b/ert_shared/models/iterated_ensemble_smoother.py index cacc36dfec2..5e10eec891f 100644 --- a/ert_shared/models/iterated_ensemble_smoother.py +++ b/ert_shared/models/iterated_ensemble_smoother.py @@ -117,6 +117,11 @@ def create_context(self, arguments, itr, prior_context = None, rerun = False): else: target_fs = self.createTargetCaseFileSystem(itr + 1 , target_case_format) + # Deleting a run_context removes the possibility to retrospectively + # determine detailed progress. Thus, before deletion, the detailed + # progress is stored. + self.updateDetailedProgress() + run_context = ErtRunContext.ensemble_smoother( sim_fs, target_fs, mask, runpath_fmt, jobname_fmt, subst_list, itr) self._run_context = run_context self._last_run_iteration = run_context.get_iter() diff --git a/ert_shared/models/multiple_data_assimilation.py b/ert_shared/models/multiple_data_assimilation.py index 4abf8b36081..d7f60e0fede 100644 --- a/ert_shared/models/multiple_data_assimilation.py +++ b/ert_shared/models/multiple_data_assimilation.py @@ -172,6 +172,11 @@ def create_context(self, arguments, itr, prior_context = None, update = True): for index, run_realization in enumerate(self.initial_realizations_mask): mask[index] = mask[index] and run_realization + # Deleting a run_context removes the possibility to retrospectively + # determine detailed progress. Thus, before deletion, the detailed + # progress is stored. + self.updateDetailedProgress() + run_context = ErtRunContext.ensemble_smoother( sim_fs, target_fs, mask, runpath_fmt, jobname_fmt, subst_list, itr) self._run_context = run_context self._last_run_iteration = run_context.get_iter() diff --git a/ert_shared/models/simulations_tracker.py b/ert_shared/models/simulations_tracker.py deleted file mode 100644 index 28fd03db28a..00000000000 --- a/ert_shared/models/simulations_tracker.py +++ /dev/null @@ -1,219 +0,0 @@ -import time - -from math import ceil, trunc -from res.job_queue import JobStatusType - - -class SimulationStateStatus(object): - COLOR_WAITING = (164, 200, 255) - COLOR_PENDING = (190,174,212) - COLOR_RUNNING = (255,255,153) - COLOR_FAILED = (255, 200, 200) - COLOR_UNKNOWN = (128, 128, 128) - COLOR_FINISHED = (127,201,127) - COLOR_NOT_ACTIVE = (255, 255, 255) - - def __init__(self, name, state, color): - self.__name = name - self.__state = state - self.__color = color - - self.__count = 0 - self.__total_count = 1 - - @property - def name(self): - return self.__name - - @property - def state(self): - return self.__state - - @property - def color(self): - return self.__color - - @property - def count(self): - return self.__count - - @count.setter - def count(self, value): - self.__count = value - - @property - def total_count(self): - return self.__total_count - - @total_count.setter - def total_count(self, value): - self.__total_count = value - - -class SimulationsTracker(object): - """SimulationsTracker provide means for tracking a simulation.""" - def __init__(self, model=None, update_interval=0.2, emit_interval=5): - """Creates a SimulationTracker. Use @model if tracking is to be used. - The provided @model is then then polled each @update_interval. For each - @emit_interval, an _update_ is yielded.""" - super(SimulationsTracker, self).__init__() - - waiting_flag = JobStatusType.JOB_QUEUE_NOT_ACTIVE | JobStatusType.JOB_QUEUE_WAITING | JobStatusType.JOB_QUEUE_SUBMITTED - waiting_state = SimulationStateStatus("Waiting", waiting_flag, SimulationStateStatus.COLOR_WAITING) - - pending_flag = JobStatusType.JOB_QUEUE_PENDING - pending_state = SimulationStateStatus("Pending", pending_flag, SimulationStateStatus.COLOR_PENDING) - - running_flag = JobStatusType.JOB_QUEUE_RUNNING | JobStatusType.JOB_QUEUE_EXIT | JobStatusType.JOB_QUEUE_RUNNING_DONE_CALLBACK | JobStatusType.JOB_QUEUE_RUNNING_EXIT_CALLBACK - running_state = SimulationStateStatus("Running", running_flag, SimulationStateStatus.COLOR_RUNNING) - - # Failed also includes simulations which have been killed by the MAX_RUNTIME system. - failed_flag = JobStatusType.JOB_QUEUE_IS_KILLED | JobStatusType.JOB_QUEUE_DO_KILL - failed_flag |= JobStatusType.JOB_QUEUE_FAILED | JobStatusType.JOB_QUEUE_DO_KILL_NODE_FAILURE - failed_state = SimulationStateStatus("Failed", failed_flag, SimulationStateStatus.COLOR_FAILED) - - done_flag = JobStatusType.JOB_QUEUE_DONE | JobStatusType.JOB_QUEUE_SUCCESS - done_state = SimulationStateStatus("Finished", done_flag, SimulationStateStatus.COLOR_FINISHED) - - unknown_flag = JobStatusType.JOB_QUEUE_UNKNOWN - unknown_state = SimulationStateStatus("Unknown", unknown_flag, SimulationStateStatus.COLOR_UNKNOWN) - - self.states = [done_state, failed_state, unknown_state, running_state, pending_state, waiting_state] - self.custom_states = [done_state, failed_state, running_state, unknown_state, pending_state, waiting_state] - self.__checkForUnusedEnums() - - self.__current_iteration = 0 - self.__total_iterations = 0 - self.__iteration_name = "" - self.__runtime = 0 # seconds - self.__queue_size = 0 - self.__indeterminate = False - - self._update_interval = update_interval - self._emit_interval = emit_interval - - self._model = model - - def getStates(self): - """ @rtype: list[SimulationStateStatus] """ - return list(self.custom_states) - - def __checkForUnusedEnums(self): - for enum in JobStatusType.enums(): - # The status check routines can return this status; if e.g. the bjobs call fails, - # but a job will never get this status. - if enum == JobStatusType.JOB_QUEUE_STATUS_FAILURE: - continue - - - used = False - for state in self.states: - if enum in state.state: - used = True - - if not used: - raise AssertionError("Enum identifier '%s' not used!" % enum) - - def _update(self): - self.__current_iteration = self._model.currentPhase() - self.__total_iterations = self._model.phaseCount() - self.__queue_size = self._model.getQueueSize() - self.__iteration_name = self._model.getPhaseName() - self.__runtime = self._model.getRunningTime() - self.__indeterminate = self._model.isIndeterminate() - - queue_status = self._model.getQueueStatus() - - for state in self.getStates(): - state.count = 0 - state.total_count = self.__queue_size - - for queue_state in queue_status: - if queue_state in state.state: - state.count += queue_status[queue_state] - - def track(self): - """Tracks a model and provides _updates_, which currently is the - instance itself.""" - if self._model is None: - raise ValueError("no model to track") - - while not self._model.isFinished(): - self._update() - - runtime = trunc(self.__runtime) - if runtime % self._emit_interval == 0: - yield self - - # Sleep for a whole second so as to not emit multiple updates - # within this emit interval. - time.sleep(ceil(self._update_interval)) - - time.sleep(self._update_interval) - - # Simulations are done, do final update and emit. - self._update() - yield self - - @property - def run_failed(self): - return self._model.hasRunFailed() - - @property - def failed_message(self): - return self._model.getFailMessage() - - @property - def current_iteration(self): - return self.__current_iteration - - @property - def total_iterations(self): - return self.__total_iterations - - @property - def iteration_name(self): - return self.__iteration_name - - @property - def runtime(self): - return self.__runtime - - @property - def queue_size(self): - return self.__queue_size - - @property - def indeterminate(self): - return self.____indeterminate - - @staticmethod - def format_running_time(runtime): - """ @rtype: str """ - days = 0 - hours = 0 - minutes = 0 - seconds = trunc(runtime) - - if seconds >= 60: - minutes, seconds = divmod(seconds, 60) - - if minutes >= 60: - hours, minutes = divmod(minutes, 60) - - if hours >= 24: - days, hours = divmod(hours, 24) - - if days > 0: - layout = "Running time: {d} days {h} hours {m} minutes {s} seconds" - - elif hours > 0: - layout = "Running time: {h} hours {m} minutes {s} seconds" - - elif minutes > 0: - layout = "Running time: {m} minutes {s} seconds" - - else: - layout = "Running time: {s} seconds" - - return layout.format(d=days, h=hours, m=minutes, s=seconds) diff --git a/ert_shared/tracker/__init__.py b/ert_shared/tracker/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ert_shared/tracker/base.py b/ert_shared/tracker/base.py new file mode 100644 index 00000000000..43cb76bc051 --- /dev/null +++ b/ert_shared/tracker/base.py @@ -0,0 +1,180 @@ +import time + +from res.job_queue import JobStatusType + +from ert_shared.tracker.events import DetailedEvent, EndEvent, GeneralEvent, TickEvent +from ert_shared.tracker.state import SimulationStateStatus +from ert_shared.tracker.utils import calculate_progress + + +class BaseTracker(object): + """BaseTracker provides the basis for doing tracking.""" + + def __init__(self, model): + """Initialize the tracker for a @model. A model can be any + BaseRunModel-derived class.""" + self._model = model + + self._states = [] + self._custom_states = [] + self._bootstrap_states() + BaseTracker.__checkForUnusedEnums(self._states) + + # keep track of phases in the model using a mapping of phase index + # to data is not accessible in the model, but should be. + # TODO: rewrite the phases API in BaseRunModel so that this can go away + # see https://github.com/equinor/ert/issues/556 + self._phase_states = {} + + def _bootstrap_states(self): + waiting_flag = ( + JobStatusType.JOB_QUEUE_NOT_ACTIVE + | JobStatusType.JOB_QUEUE_WAITING + | JobStatusType.JOB_QUEUE_SUBMITTED + ) + waiting_state = SimulationStateStatus( + "Waiting", waiting_flag, SimulationStateStatus.COLOR_WAITING + ) + + pending_flag = JobStatusType.JOB_QUEUE_PENDING + pending_state = SimulationStateStatus( + "Pending", pending_flag, SimulationStateStatus.COLOR_PENDING + ) + + running_flag = ( + JobStatusType.JOB_QUEUE_RUNNING + | JobStatusType.JOB_QUEUE_EXIT + | JobStatusType.JOB_QUEUE_RUNNING_DONE_CALLBACK + | JobStatusType.JOB_QUEUE_RUNNING_EXIT_CALLBACK + ) + running_state = SimulationStateStatus( + "Running", running_flag, SimulationStateStatus.COLOR_RUNNING + ) + + # Failed also includes simulations which have been killed by the MAX_RUNTIME system. + failed_flag = ( + JobStatusType.JOB_QUEUE_IS_KILLED | JobStatusType.JOB_QUEUE_DO_KILL + ) + failed_flag |= ( + JobStatusType.JOB_QUEUE_FAILED + | JobStatusType.JOB_QUEUE_DO_KILL_NODE_FAILURE + ) + failed_state = SimulationStateStatus( + "Failed", failed_flag, SimulationStateStatus.COLOR_FAILED + ) + + done_flag = JobStatusType.JOB_QUEUE_DONE | JobStatusType.JOB_QUEUE_SUCCESS + done_state = SimulationStateStatus( + "Finished", done_flag, SimulationStateStatus.COLOR_FINISHED + ) + + unknown_flag = JobStatusType.JOB_QUEUE_UNKNOWN + unknown_state = SimulationStateStatus( + "Unknown", unknown_flag, SimulationStateStatus.COLOR_UNKNOWN + ) + + self._states = [ + done_state, + failed_state, + unknown_state, + running_state, + pending_state, + waiting_state, + ] + self._custom_states = [ + done_state, + failed_state, + running_state, + unknown_state, + pending_state, + waiting_state, + ] + + def _update_phase_map(self): + phase = self._model.currentPhase() + if phase not in self._phase_states: + # False indicates that we will not determine (yet) whether or not + # it has had activity (i.e. queue has run). + self._phase_states[phase] = False + + if self._model.isQueueRunning(): + # This phase has job queue activity. + self._phase_states[phase] = True + + def _tick_event(self): + if self._model.stop_time() < self._model.start_time(): + runtime = time.time() - self._model.start_time() + else: + runtime = self._model.stop_time() - self._model.start_time() + + return TickEvent(runtime) + + def _general_event(self): + self._update_phase_map() + + phase_name = self._model.getPhaseName() + phase = self._model.currentPhase() + phase_count = self._model.phaseCount() + queue_status = self._model.getQueueStatus() + + done_count = 0 + for state in self.get_states(): + state.count = 0 + state.total_count = self._model.getQueueSize() + + for queue_state in queue_status: + if queue_state in state.state: + state.count += queue_status[queue_state] + + if state.name == "Finished": + done_count = state.count + + progress = calculate_progress( + phase, + phase_count, + self._model.isFinished(), + self._model.isQueueRunning(), + self._model.getQueueSize(), + self._phase_states[phase], + done_count, + ) + + tick = self._tick_event() + return GeneralEvent( + phase_name, + phase, + phase_count, + progress, + self._model.isIndeterminate(), + self.get_states(), + tick.runtime, + ) + + def _detailed_event(self): + return DetailedEvent(*self._model.getDetailedProgress()) + + def _end_event(self): + return EndEvent(self._model.hasRunFailed(), self._model.getFailMessage()) + + @staticmethod + def __checkForUnusedEnums(states): + for enum in JobStatusType.enums(): + # The status check routines can return this status; if e.g. the bjobs call fails, + # but a job will never get this status. + if enum == JobStatusType.JOB_QUEUE_STATUS_FAILURE: + continue + + used = False + for state in states: + if enum in state.state: + used = True + + if not used: + raise AssertionError("Enum identifier '%s' not used!" % enum) + + def get_states(self): + """ @rtype: list[SimulationStateStatus] """ + return list(self._custom_states) + + def reset(self): + self._phase_states = {} diff --git a/ert_shared/tracker/blocking.py b/ert_shared/tracker/blocking.py new file mode 100644 index 00000000000..538f80c5c3d --- /dev/null +++ b/ert_shared/tracker/blocking.py @@ -0,0 +1,44 @@ +import time + +from ert_shared.tracker.base import BaseTracker + + +class BlockingTracker(BaseTracker): + """The BlockingTracker provide tracking for non-qt consumers.""" + + def __init__(self, model, tick_interval, general_interval, detailed_interval): + """See create_tracker for details.""" + super(BlockingTracker, self).__init__(model) + self._tick_interval = tick_interval + self._general_interval = general_interval + self._detailed_interval = detailed_interval + + def track(self): + """Tracks the model in a blocking manner. This method is a generator + and will yield events at the appropriate times.""" + tick = 0 + while not self._model.isFinished(): + if self._tick_interval and tick % self._tick_interval == 0: + yield self._tick_event() + if self._general_interval and tick % self._general_interval == 0: + yield self._general_event() + if self._detailed_interval and tick % self._detailed_interval == 0: + yield self._detailed_event() + + tick += 1 + time.sleep(1) + + # Simulation done, emit final updates + if self._tick_interval > 0: + yield self._tick_event() + + if self._general_interval > 0: + yield self._general_event() + + if self._detailed_interval > 0: + yield self._detailed_event() + + yield self._end_event() + + def stop(self): + raise NotImplementedError("cannot stop BlockingTracker") diff --git a/ert_shared/tracker/events.py b/ert_shared/tracker/events.py new file mode 100644 index 00000000000..b02e7797855 --- /dev/null +++ b/ert_shared/tracker/events.py @@ -0,0 +1,35 @@ +class TickEvent(object): + def __init__(self, runtime): + self.runtime = runtime + + +class GeneralEvent(object): + def __init__( + self, + phase_name, + current_phase, + total_phases, + progress, + indeterminate, + sim_states, + runtime, + ): + self.phase_name = phase_name + self.current_phase = current_phase + self.total_phases = total_phases + self.progress = progress + self.indeterminate = indeterminate + self.sim_states = sim_states + self.runtime = runtime + + +class DetailedEvent(object): + def __init__(self, details, iteration): + self.details = details + self.iteration = iteration + + +class EndEvent(object): + def __init__(self, failed, failed_msg=None): + self.failed = failed + self.failed_msg = failed_msg diff --git a/ert_shared/tracker/factory.py b/ert_shared/tracker/factory.py new file mode 100644 index 00000000000..8531fe09c5c --- /dev/null +++ b/ert_shared/tracker/factory.py @@ -0,0 +1,47 @@ +from ert_shared.tracker.blocking import BlockingTracker +from ert_shared.tracker.qt import QTimerTracker +from ert_shared.tracker.utils import scale_intervals + + +def create_tracker( + model, + tick_interval=1, + general_interval=5, + detailed_interval=10, + qtimer_cls=None, + event_handler=None, + num_realizations=None, +): + """Creates a tracker tracking a @model. The provided model + is updated in three tiers: @tick_interval, + @general_interval, @detailed_interval. Setting any + interval to <=0 disables update. + + Should a @qtimer_cls be defined, the Qt event loop will be used for + tracking. @event_handler must then be defined. + + If @num_realizations is defined, then the intervals are scaled + according to some affine transformation such that it is tractable to + do tracking. + """ + if num_realizations is not None: + general_interval, detailed_interval = scale_intervals(num_realizations) + + if qtimer_cls: + if not event_handler: + raise ValueError( + "event_handler must be defined if" + "qtimer_cls is defined" + ) + tracker = QTimerTracker( + model, + qtimer_cls, + tick_interval, + general_interval, + detailed_interval, + event_handler, + ) + else: + tracker = BlockingTracker( + model, tick_interval, general_interval, detailed_interval + ) + return tracker diff --git a/ert_shared/tracker/qt.py b/ert_shared/tracker/qt.py new file mode 100644 index 00000000000..cf2c0bac978 --- /dev/null +++ b/ert_shared/tracker/qt.py @@ -0,0 +1,72 @@ +from ert_shared.tracker.base import BaseTracker + + +class QTimerTracker(BaseTracker): + """The QTimerTracker provide tracking for Qt-based consumers using + QTimer.""" + + def __init__( + self, + model, + qtimer_cls, + tick_interval, + general_interval, + detailed_interval, + event_handler, + ): + """See create_tracker for details.""" + super(QTimerTracker, self).__init__(model) + self._qtimers = [] + self._event_handler = event_handler + + if tick_interval <= 0: + raise ValueError( + "the qt driven tracker requires ticks in order " + + "to check for completion" + ) + + timer = qtimer_cls() + timer.setInterval(tick_interval * 1000) + timer.timeout.connect(self._tick) + self._qtimers.append(timer) + + if general_interval > 0: + timer = qtimer_cls() + timer.setInterval(general_interval * 1000) + timer.timeout.connect(self._general) + self._qtimers.append(timer) + + if detailed_interval > 0: + timer = qtimer_cls() + timer.setInterval(detailed_interval * 1000) + timer.timeout.connect(self._detailed) + self._qtimers.append(timer) + + def _tick(self): + self._event_handler(self._tick_event()) + + # Check for completion. If Complete, emit all events including a final + # EndEvent. All timers stop after that. + if self._model.isFinished(): + self._general() + self._detailed() + self._end() + + self.stop() + + def _general(self): + self._event_handler(self._general_event()) + + def _detailed(self): + self._event_handler(self._detailed_event()) + + def _end(self): + self._event_handler(self._end_event()) + + def track(self): + for timer in self._qtimers: + timer.start() + + def stop(self): + for timer in self._qtimers: + timer.stop() diff --git a/ert_shared/tracker/state.py b/ert_shared/tracker/state.py new file mode 100644 index 00000000000..d93644fb667 --- /dev/null +++ b/ert_shared/tracker/state.py @@ -0,0 +1,44 @@ +class SimulationStateStatus(object): + COLOR_WAITING = (164, 200, 255) + COLOR_PENDING = (190, 174, 212) + COLOR_RUNNING = (255, 255, 153) + COLOR_FAILED = (255, 200, 200) + COLOR_UNKNOWN = (128, 128, 128) + COLOR_FINISHED = (127, 201, 127) + COLOR_NOT_ACTIVE = (255, 255, 255) + + def __init__(self, name, state, color): + self.__name = name + self.__state = state + self.__color = color + + self.__count = 0 + self.__total_count = 1 + + @property + def name(self): + return self.__name + + @property + def state(self): + return self.__state + + @property + def color(self): + return self.__color + + @property + def count(self): + return self.__count + + @count.setter + def count(self, value): + self.__count = value + + @property + def total_count(self): + return self.__total_count + + @total_count.setter + def total_count(self, value): + self.__total_count = value diff --git a/ert_shared/tracker/utils.py b/ert_shared/tracker/utils.py new file mode 100644 index 00000000000..923e6c65606 --- /dev/null +++ b/ert_shared/tracker/utils.py @@ -0,0 +1,68 @@ +import math + + +def calculate_progress( + phase, phase_count, finished, queue_running, queue_size, phase_has_run, done_count +): + if finished: + return 1.0 + if not queue_running and phase_has_run: + # queue is not running, but it has run for this phase, so it's done + return (phase + 1.0) / phase_count + else: + phase_progress = float(done_count) / queue_size + return (phase + phase_progress) / phase_count + + +def format_running_time(runtime): + """ @rtype: str """ + days = 0 + hours = 0 + minutes = 0 + seconds = math.trunc(runtime) + + if seconds >= 60: + minutes, seconds = divmod(seconds, 60) + + if minutes >= 60: + hours, minutes = divmod(minutes, 60) + + if hours >= 24: + days, hours = divmod(hours, 24) + + if days > 0: + layout = "Running time: {d} days {h} hours {m} minutes {s} seconds" + + elif hours > 0: + layout = "Running time: {h} hours {m} minutes {s} seconds" + + elif minutes > 0: + layout = "Running time: {m} minutes {s} seconds" + + else: + layout = "Running time: {s} seconds" + + return layout.format(d=days, h=hours, m=minutes, s=seconds) + + +def scale_intervals(reals): + scaled_gen = _scale(reals, min_time=1, max_time=5) + scaled_det = _scale(reals, min_time=1, max_time=15) + return math.trunc(scaled_gen), math.trunc(scaled_det) + + +def _scale(nr_realizations, min_time=5, max_time=15, min_real=1, max_real=500): + nr_realizations = min(max_real, nr_realizations) + nr_realizations = max(min_real, nr_realizations) + norm_real = _norm(min_real, max_real, nr_realizations) + + scaling_factor = _norm(_func(0), _func(1), _func(norm_real)) + return min_time + scaling_factor * (max_time - min_time) + + +def _norm(min_val, max_val, val): + return (val - min_val) / (max_val - min_val) + + +def _func(val): + return 1.0 * (1.0 + 500.0) ** val diff --git a/tests/cli/test_monitor.py b/tests/cli/test_monitor.py index 27e8ef3af9a..e129c31016c 100644 --- a/tests/cli/test_monitor.py +++ b/tests/cli/test_monitor.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import sys import unittest -from ert_shared.models import BaseRunModel, SimulationsTracker +from ert_shared.tracker.events import GeneralEvent +from ert_shared.tracker.state import SimulationStateStatus from ert_shared.cli.monitor import Monitor @@ -10,6 +11,7 @@ else: from io import BytesIO as StringIO + class MonitorTest(unittest.TestCase): def test_color_always(self): @@ -20,13 +22,12 @@ def test_color_always(self): monitor._colorize("Foo", fg=(255, 0, 0))) def test_legends(self): - sim_tracker = SimulationsTracker() - done_state = sim_tracker.getStates()[0] # first is the Finished state - done_state.total_count = 100 + done_state = SimulationStateStatus("Finished", None, None) done_state.count = 10 + done_state.total_count = 100 monitor = Monitor(out=StringIO()) - legends = monitor._get_legends(sim_tracker) + legends = monitor._get_legends([done_state]) self.assertEqual("Finished 10/100", legends[done_state]) @@ -51,24 +52,23 @@ def test_result_failure(self): def test_print_progress(self): out = StringIO() - sim_tracker = SimulationsTracker(model=BaseRunModel(None)) monitor = Monitor(out=out) - sim_tracker._update() - sim_tracker.getStates()[0].count = 1 + states = [ + SimulationStateStatus("Finished", None, None), + SimulationStateStatus("Waiting", None, None), + ] + states[0].count = 10 + states[0].total_count = 100 + general_event = GeneralEvent("Test Phase", 0, 2, 0.5, False, states, 10) - monitor._print_progress(sim_tracker) + monitor._print_progress(general_event) self.assertEqual( """\r - --> Starting... + --> Test Phase - 1/1 |██████████████████████████████| 100% Running time: 0 seconds + 1/2 |███████████████ | 50% Running time: 10 seconds - Finished 1/1 - Failed 0/1 - Running 0/1 - Unknown 0/1 - Pending 0/1 + Finished 10/100 Waiting 0/1 - """, out.getvalue()) diff --git a/tests/models/test_base_run_model.py b/tests/models/test_base_run_model.py index 7de0d796736..82cbd14a697 100644 --- a/tests/models/test_base_run_model.py +++ b/tests/models/test_base_run_model.py @@ -1,16 +1,57 @@ -from tests import ErtTest -from res.enkf import EnKFMain -from res.test import ErtTestContext -from ert_shared.models import BaseRunModel +import sys +import unittest + from ert_gui.ertnotifier import configureErtNotifier +from ert_shared.models import BaseRunModel +from res.job_queue import JobStatusType +from res.test import ErtTestContext +from tests import ErtTest + +if sys.version_info >= (3, 3): + from unittest.mock import Mock, patch +else: + from mock import Mock, patch -class BaseRunModelTest(ErtTest): +class BaseRunModelTest(ErtTest): def test_instantiation(self): - config_file = self.createTestPath('local/simple_config/minimum_config') - with ErtTestContext('kjell', config_file) as work_area: + config_file = self.createTestPath("local/simple_config/minimum_config") + with ErtTestContext("kjell", config_file) as work_area: ert = work_area.getErt() configureErtNotifier(ert, config_file) - brm = BaseRunModel(ert.get_queue_config( )) + brm = BaseRunModel(ert.get_queue_config()) self.assertFalse(brm.isQueueRunning()) - self.assertTrue(brm.getProgress() >= 0) + + +class InMemoryBaseRunModelTest(unittest.TestCase): + def test_detailed_progress(self): + # TODO: rewrite to make use of fixtures + brm = BaseRunModel(None) + brm._run_context = Mock() + brm._run_context.get_iter.return_value = 0 + + run_arg1 = Mock() + run_arg1.getQueueIndex.return_value = 0 + run_arg2 = Mock() + run_arg2.getQueueIndex.return_value = 1 + run_arg2.iens = 0 + brm._run_context.__iter__ = Mock() + brm._run_context.__iter__.return_value = iter([run_arg1, run_arg2]) + + def job_status(queue_index): + if queue_index == 0: + return JobStatusType.JOB_QUEUE_PENDING + if queue_index == 1: + return JobStatusType.JOB_QUEUE_RUNNING + + brm._job_queue = Mock() + brm._job_queue.getJobStatus.side_effect = job_status + + with patch("ert_shared.models.base_run_model.ForwardModelStatus") as f: + f.load.return_value = Mock() + f.load.return_value.jobs = [{"name": "job1"}] + brm.updateDetailedProgress() + + jobs, status = brm.realization_progress[0][0] + self.assertEqual(len(jobs), 1) + self.assertIn("name", jobs[0]) diff --git a/tests/models/test_simulation_tracker.py b/tests/models/test_simulation_tracker.py deleted file mode 100644 index 20c6f04d35e..00000000000 --- a/tests/models/test_simulation_tracker.py +++ /dev/null @@ -1,37 +0,0 @@ -import unittest - -from ert_shared.models import BaseRunModel, SimulationsTracker - - -class SimulationTrackerTest(unittest.TestCase): - - def test_track(self): - brm = BaseRunModel(None) - tracker = SimulationsTracker( - update_interval=0, emit_interval=1, model=brm) - track_iter = tracker.track() - - self.assertEqual(tracker, next(track_iter), "no update") - - brm._phase = 1 # model now done, track should stop - self.assertEqual(tracker, next(track_iter), "no final update") - - with self.assertRaises(StopIteration, - msg="tracker did not stop"): - next(track_iter) - - def test_format_running_time(self): - tests = [ - {"seconds": 0, "expected": "Running time: 0 seconds"}, - {"seconds": 1, "expected": "Running time: 1 seconds"}, - {"seconds": 100, "expected": "Running time: 1 minutes 40 seconds"}, - {"seconds": 10000, "expected": "Running time: 2 hours 46 minutes 40 seconds"}, # noqa - {"seconds": 100000, "expected": "Running time: 1 days 3 hours 46 minutes 40 seconds"}, # noqa - {"seconds": 100000000, "expected": "Running time: 1157 days 9 hours 46 minutes 40 seconds"}, # noqa - ] - - for t in tests: - self.assertEqual( - t["expected"], - SimulationsTracker.format_running_time(t["seconds"]) - ) diff --git a/tests/tracker/__init__.py b/tests/tracker/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/tracker/test_base_tracker.py b/tests/tracker/test_base_tracker.py new file mode 100644 index 00000000000..5f43e6cb2b3 --- /dev/null +++ b/tests/tracker/test_base_tracker.py @@ -0,0 +1,55 @@ +import sys +import unittest + +from ert_shared.tracker.base import BaseTracker +from res.job_queue import JobStatusType + +if sys.version_info >= (3, 3): + from unittest.mock import Mock +else: + from mock import Mock + + +class BaseTrackerTest(unittest.TestCase): + def setUp(self): + self.model = Mock() + self.tracker = BaseTracker(self.model) + + def test_tick_event_generation(self): + self.model.start_time.return_value = 100 + self.model.stop_time.return_value = 200 + + tick_event = self.tracker._tick_event() + + self.assertEqual(100, tick_event.runtime) + + def test_general_event_generation(self): + self.model.getPhaseName.return_value = "Test Phase" + self.model.currentPhase.return_value = 1 + self.model.phaseCount.return_value = 2 + self.model.getQueueSize.return_value = 100 + self.model.isFinished.return_value = False + self.model.isQueueRunning.return_value = True + self.model.isIndeterminate.return_value = True + self.model.start_time.return_value = 100 + self.model.stop_time.return_value = 200 + self.model.getQueueStatus.return_value = { + JobStatusType.JOB_QUEUE_DONE: 50 + } + + general_event = self.tracker._general_event() + + self.assertEqual("Test Phase", general_event.phase_name) + self.assertEqual(1, general_event.current_phase) + self.assertEqual(2, general_event.total_phases) + self.assertEqual(0.75, general_event.progress) + self.assertEqual(True, general_event.indeterminate) + self.assertEqual(100, general_event.runtime) + + def test_detailed_event_generation(self): + self.model.getDetailedProgress.return_value = {}, -1 + + detailed_event = self.tracker._detailed_event() + + self.assertEqual({}, detailed_event.details) + self.assertEqual(-1, detailed_event.iteration) diff --git a/tests/tracker/test_blocking_tracker.py b/tests/tracker/test_blocking_tracker.py new file mode 100644 index 00000000000..97a928c3d50 --- /dev/null +++ b/tests/tracker/test_blocking_tracker.py @@ -0,0 +1,32 @@ +import unittest + +from ert_shared.models import BaseRunModel +from ert_shared.tracker.blocking import BlockingTracker +from ert_shared.tracker.events import (DetailedEvent, EndEvent, GeneralEvent, + TickEvent) + + +class BlockingTrackerTest(unittest.TestCase): + + def test_event_loop_runs(self): + brm = BaseRunModel(None, phase_count=1) + tracker = BlockingTracker(brm, 1, 1, 1) + + idx = 0 + expected = [TickEvent, GeneralEvent, DetailedEvent, TickEvent] + for event in tracker.track(): + self.assertIsInstance(event, expected[idx], + "got unexpected event") + + idx += 1 + if idx == 4: # End after recieving the second TickEvent + break + + def test_end_events_from_finished_model(self): + brm = BaseRunModel(None, phase_count=0) + tracker = BlockingTracker(brm, 1, 1, 1) + + events = list(tracker.track()) + for idx, ev_cls in enumerate([TickEvent, GeneralEvent, DetailedEvent, + EndEvent]): + self.assertIsInstance(events[idx], ev_cls) diff --git a/tests/tracker/test_factory.py b/tests/tracker/test_factory.py new file mode 100644 index 00000000000..1d0b049e645 --- /dev/null +++ b/tests/tracker/test_factory.py @@ -0,0 +1,24 @@ +import sys +import unittest + +from ert_shared.tracker.blocking import BlockingTracker +from ert_shared.tracker.factory import create_tracker +from ert_shared.tracker.qt import QTimerTracker + +if sys.version_info >= (3, 3): + from unittest.mock import Mock +else: + from mock import Mock + + +class TrackerFactoryTest(unittest.TestCase): + + def test_create_trackers(self): + q_tracker = create_tracker(None, qtimer_cls=Mock(), + event_handler=Mock()) + self.assertIsInstance(q_tracker, QTimerTracker, + "failed to create QTimerTracker") + + blocking_tracker = create_tracker(None) + self.assertIsInstance(blocking_tracker, BlockingTracker, + "failed to create BlockingTracker") diff --git a/tests/tracker/test_qtimer_tracker.py b/tests/tracker/test_qtimer_tracker.py new file mode 100644 index 00000000000..2e8e906adcf --- /dev/null +++ b/tests/tracker/test_qtimer_tracker.py @@ -0,0 +1,49 @@ +import sys +import unittest + +from ert_shared.models import BaseRunModel +from ert_shared.tracker.events import (DetailedEvent, EndEvent, GeneralEvent, + TickEvent) +from ert_shared.tracker.qt import QTimerTracker + +if sys.version_info >= (3, 3): + from unittest.mock import Mock +else: + from mock import Mock + + +class QTimerTrackerTest(unittest.TestCase): + + def test_qtimers_are_instantiated_and_setup(self): + intervals = [1, 2, 3] + tracker = QTimerTracker( + None, Mock, intervals[0], intervals[1], intervals[2], Mock()) + + self.assertEqual(3, len(tracker._qtimers), + "tracker did not create three timers") + for idx, interval in enumerate(intervals): + timer = tracker._qtimers[idx] + timer.setInterval.assert_called_once_with(interval*1000) + timer.timeout.connect.assert_called_once() + + def test_end_events_are_emitted(self): + event_handler = Mock() + brm = BaseRunModel(None, phase_count=0) # a finished model + tracker = QTimerTracker(brm, Mock, 1, 0, 0, event_handler) + + tracker._tick() + + for idx, ev_cls in enumerate([TickEvent, GeneralEvent, DetailedEvent, + EndEvent]): + _, args, _ = event_handler.mock_calls[idx] + self.assertIsInstance(args[0], ev_cls, + "called with unexpected event") + + def test_qtimers_are_stopped_for_finished_model(self): + brm = BaseRunModel(None, phase_count=0) # a finished model + tracker = QTimerTracker(brm, Mock, 1, 0, 0, Mock()) + + tracker._tick() + + for timer in tracker._qtimers: + timer.stop.assert_called_once() diff --git a/tests/tracker/test_utils.py b/tests/tracker/test_utils.py new file mode 100644 index 00000000000..802babec140 --- /dev/null +++ b/tests/tracker/test_utils.py @@ -0,0 +1,70 @@ +import unittest + +from ert_shared.tracker.utils import (calculate_progress, format_running_time, + scale_intervals) + + +class TrackerUtilsTest(unittest.TestCase): + + def test_format_running_time(self): + tests = [ + {"seconds": 0, "expected": "Running time: 0 seconds"}, + {"seconds": 1, "expected": "Running time: 1 seconds"}, + {"seconds": 100, "expected": "Running time: 1 minutes 40 seconds"}, + {"seconds": 10000, "expected": "Running time: 2 hours 46 minutes 40 seconds"}, # noqa + {"seconds": 100000, "expected": "Running time: 1 days 3 hours 46 minutes 40 seconds"}, # noqa + {"seconds": 100000000, "expected": "Running time: 1157 days 9 hours 46 minutes 40 seconds"}, # noqa + ] + + for t in tests: + self.assertEqual( + t["expected"], + format_running_time(t["seconds"]) + ) + + def test_scale_intervals(self): + tests = [ + {"reals": 1, "expected_gen": 1, "expected_det": 1}, + {"reals": 100, "expected_gen": 1, "expected_det": 1}, + {"reals": 500, "expected_gen": 5, "expected_det": 15}, + {"reals": 900, "expected_gen": 5, "expected_det": 15}, + {"reals": 1000, "expected_gen": 5, "expected_det": 15}, + ] + + for t in tests: + actual_gen, actual_det = scale_intervals(t["reals"]) + self.assertEqual( + t["expected_gen"], + actual_gen, + "failed to scale general to {} (was: {}) for {} reals".format( + t["expected_gen"], actual_gen, t["reals"] + ) + ) + self.assertEqual( + t["expected_det"], + actual_det, + "failed to scale detailed to {} (was: {}) for {} reals".format( + t["expected_det"], actual_det, t["reals"] + ) + ) + + def test_calculate_progress(self): + tests = [ + {"expected": 0.01, "phase": 0, "phase_count": 1, "finished": False, "queue_running": False, "queue_size": 100, "phase_has_run": False, "done_count": 1}, # noqa + {"expected": 1, "phase": 1, "phase_count": 1, "finished": True, "queue_running": False, "queue_size": 100, "phase_has_run": True, "done_count": 100}, # noqa + {"expected": 0.5, "phase": 0, "phase_count": 2, "finished": False, "queue_running": False, "queue_size": 100, "phase_has_run": True, "done_count": 100}, # noqa + {"expected": 0, "phase": 0, "phase_count": 2, "finished": False, "queue_running": False, "queue_size": 100, "phase_has_run": False, "done_count": 0}, # noqa + {"expected": 0.5, "phase": 0, "phase_count": 2, "finished": False, "queue_running": False, "queue_size": 100, "phase_has_run": True, "done_count": 0}, # noqa + ] + + for t in tests: + self.assertEqual(t["expected"], calculate_progress( + t["phase"], + t["phase_count"], + t["finished"], + t["queue_running"], + t["queue_size"], + t["phase_has_run"], + t["done_count"], + ) + )