diff --git a/monai/deploy/cli/main.py b/monai/deploy/cli/main.py index 45e9ca79..ed7dbda1 100644 --- a/monai/deploy/cli/main.py +++ b/monai/deploy/cli/main.py @@ -78,7 +78,7 @@ def parse_args(argv: Optional[List[str]] = None, default_command: Optional[str] return args -def set_up_logging(level: str, config_path: Union[str, Path] = LOG_CONFIG_FILENAME): +def set_up_logging(level: Optional[str], config_path: Union[str, Path] = LOG_CONFIG_FILENAME): """Initializes the logger and sets up logging level. Args: diff --git a/monai/deploy/core/app_context.py b/monai/deploy/core/app_context.py index e083b062..a2656864 100644 --- a/monai/deploy/core/app_context.py +++ b/monai/deploy/core/app_context.py @@ -9,8 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from argparse import Namespace -from typing import Optional +from typing import Dict, Optional from .resource import Resource from .runtime_env import RuntimeEnv @@ -19,21 +18,40 @@ class AppContext: """A class to store the context of an application.""" - def __init__(self, args: Namespace, runtime_env: Optional[RuntimeEnv] = None): + def __init__(self, args: Dict[str, str], runtime_env: Optional[RuntimeEnv] = None): + # Set the args + self.args: Dict[str, str] = {} # Set the runtime environment self.runtime_env = runtime_env or RuntimeEnv() + # Set the graph engine here because it would be used in the constructor of Application class so cannot be + # updated in Application.run() method. + self.graph = args.get("graph") or self.runtime_env.graph + + self.update(args) + + def update(self, args: Dict[str, str]): + """Update the context with new args and runtime_env.""" + # Update args + self.args.update(args) + # Set the path to input/output/model - self.input_path = args.input or self.runtime_env.input - self.output_path = args.output or self.runtime_env.output - self.model_path = args.model or self.runtime_env.model - self.workdir = args.workdir or self.runtime_env.workdir + self.input_path = args.get("input") or self.args.get("input") or self.runtime_env.input + self.output_path = args.get("output") or self.args.get("output") or self.runtime_env.output + self.model_path = args.get("model") or self.args.get("model") or self.runtime_env.model + self.workdir = args.get("workdir") or self.args.get("workdir") or self.runtime_env.workdir - # Set the backend engines - self.graph = args.graph or self.runtime_env.graph - self.datastore = args.datastore or self.runtime_env.datastore - self.executor = args.executor or self.runtime_env.executor + # Set the backend engines except for the graph engine + self.datastore = args.get("datastore") or self.args.get("datastore") or self.runtime_env.datastore + self.executor = args.get("executor") or self.args.get("executor") or self.runtime_env.executor # Set resource limits # TODO(gigony): Add cli option to set resource limits self.resource = Resource() + + def __repr__(self): + return ( + f"AppContext(graph={self.graph}, input_path={self.input_path}, output_path={self.output_path}, " + f"model_path={self.model_path}, workdir={self.workdir}, datastore={self.datastore}, " + f"executor={self.executor}, resource={self.resource})" + ) diff --git a/monai/deploy/core/application.py b/monai/deploy/core/application.py index 19c1962d..63aeb1ff 100644 --- a/monai/deploy/core/application.py +++ b/monai/deploy/core/application.py @@ -94,19 +94,26 @@ def __init__( else: self.path = get_class_file_path(self.__class__) - # Setup program arguments - if path is None: - argv = sys.argv + # Set the runtime environment + if str(self.path).startswith(" bool: @@ -316,10 +319,116 @@ def get_package_info(self, model_path: Union[str, Path] = "") -> Dict: "pip-packages": pip_requirement_list, } - def run(self): - datastore = DatastoreFactory.create(self.context.datastore) - executor = ExecutorFactory.create(self.context.executor, {"app": self, "datastore": datastore}) - executor.run() + def run( + self, + log_level: Optional[str] = None, + input: Optional[str] = None, + output: Optional[str] = None, + model: Optional[str] = None, + workdir: Optional[str] = None, + datastore: Optional[str] = None, + executor: Optional[str] = None, + ) -> None: + """Runs the application. + + This method accepts `log_level` to set the log level of the application. + + Other arguments are used to specify the `input`, `output`, `model`, `workdir`, `datastore`, and `executor`. + (Cannot set `graph` because it is set and used by the constructor.) + + If those arguments are not specified, values in the application context will be used. + + This method is useful when you want to interactively run the application inside IPython (Jupyter Notebook). + + For example, you can run the following code in a notebook: + + >>> from pathlib import Path + >>> from monai.deploy.core import ( + >>> Application, + >>> DataPath, + >>> ExecutionContext, + >>> InputContext, + >>> IOType, + >>> Operator, + >>> OutputContext, + >>> input, + >>> output, + >>> resource, + >>> ) + >>> + >>> @input("path", DataPath, IOType.DISK) + >>> @output("path", DataPath, IOType.DISK) + >>> class FirstOperator(Operator): + >>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext): + >>> print(f"First Operator. input:{input.get().path}, model:{context.models.get().path}") + >>> output_path = Path("output_first.txt") + >>> output_path.write_text("first output\\n") + >>> output.set(DataPath(output_path)) + >>> + >>> @input("path", DataPath, IOType.DISK) + >>> @output("path", DataPath, IOType.DISK) + >>> class SecondOperator(Operator): + >>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext): + >>> print(f"First Operator. output:{output.get().path}, model:{context.models.get().path}") + >>> # The leaf operators can only read output DataPath and should not set output DataPath. + >>> output_path = output.get().path / "output_second.txt" + >>> output_path.write_text("second output\\n") + >>> + >>> class App(Application): + >>> def compose(self): + >>> first_op = FirstOperator() + >>> second_op = SecondOperator() + >>> + >>> self.add_flow(first_op, second_op) + >>> + >>> if __name__ == "__main__": + >>> App(do_run=True) + + >>> app = App() + >>> app.run(input="inp", output="out", model="model.pt") + + >>> !ls out + + Args: + log_level (Optional[str]): A log level. + input (Optional[str]): An input data path. + output (Optional[str]): An output data path. + model (Optional[str]): A model path. + workdir (Optional[str]): A working directory path. + datastore (Optional[str]): A datastore path. + executor (Optional[str]): An executor name. + """ + # Set arguments + args = {} + if input is not None: + args["input"] = input + if output is not None: + args["output"] = output + if model is not None: + args["model"] = model + if workdir is not None: + args["workdir"] = workdir + if datastore is not None: + args["datastore"] = datastore + if executor is not None: + args["executor"] = executor + + # If no arguments are specified and if runtime is in IPython, do not run the application. + if len(args) == 0 and self.in_ipython: + return + + # Update app context + app_context = self.context + app_context.update(args) + + # Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder) + # and run the application + app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME + set_up_logging(log_level, config_path=app_log_config_path) + + datastore_obj = DatastoreFactory.create(app_context.datastore) + executor_obj = ExecutorFactory.create(app_context.executor, {"app": self, "datastore": datastore_obj}) + executor_obj.run() @abstractmethod def compose(self): diff --git a/monai/deploy/core/executors/single_process_executor.py b/monai/deploy/core/executors/single_process_executor.py index dead14c5..d2079141 100644 --- a/monai/deploy/core/executors/single_process_executor.py +++ b/monai/deploy/core/executors/single_process_executor.py @@ -48,6 +48,10 @@ def run(self): input_path = os.path.abspath(self.app.context.input_path) output_path = os.path.abspath(self.app.context.output_path) + # Create the output directory if it does not exist + if not os.path.exists(output_path): + os.makedirs(output_path, exist_ok=True) + # Store old pwd old_pwd = os.getcwd() @@ -73,76 +77,77 @@ def run(self): g = self.app.graph - for op in g.gen_worklist(): - op_exec_context = ExecutionContext(exec_context, op) - - # Set source input for a label if op is a root node and (, ) == (DataPath, IOType.DISK) - is_root = g.is_root(op) - if is_root: - input_op_info = op.op_info - input_labels = input_op_info.get_labels(IO.INPUT) - for input_label in input_labels: - input_data_type = input_op_info.get_data_type(IO.INPUT, input_label) - input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label) - if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK: - op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label) - - # Set destination output for a label if op is a leaf node and (, ) == (DataPath, IOType.DISK) - is_leaf = g.is_leaf(op) - if is_leaf: - output_op_info = op.op_info - output_labels = output_op_info.get_labels(IO.OUTPUT) - for output_label in output_labels: - output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label) - output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label) - if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK: - op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label) - - # Change the current working directory to the working directory of the operator - # op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}" - relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/") - op_output_folder = str(Path(workdir, relative_output_path)) - os.makedirs(op_output_folder, exist_ok=True) - os.chdir(op_output_folder) - - # Execute pre_compute() - print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET) - op.pre_compute() - - # Execute compute() - print( - Fore.GREEN - + "Executing operator %s " % op.__class__.__name__ - + Fore.YELLOW - + "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid) - + Fore.RESET - ) - op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context) - - # Execute post_compute() - print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET) - op.post_compute() - - # Set input to next operator - next_ops = g.gen_next_operators(op) - for next_op in next_ops: - io_map = g.get_io_map(op, next_op) - if not io_map: - import inspect - - raise IOMappingError( - f"No IO mappings found for {op.name} -> {next_op.name} in " - f"{inspect.getabsfile(self.app.__class__)}" - ) - - next_op_exec_context = ExecutionContext(exec_context, next_op) - for (out_label, in_labels) in io_map.items(): - output = op_exec_context.output_context.get(out_label) - for in_label in in_labels: - next_op_exec_context.input_context.set(output, in_label) - - # Restore pwd - os.chdir(old_pwd) + try: + for op in g.gen_worklist(): + op_exec_context = ExecutionContext(exec_context, op) + + # Set source input for a label if op is a root node and (,) == (DataPath,IOType.DISK) + is_root = g.is_root(op) + if is_root: + input_op_info = op.op_info + input_labels = input_op_info.get_labels(IO.INPUT) + for input_label in input_labels: + input_data_type = input_op_info.get_data_type(IO.INPUT, input_label) + input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label) + if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK: + op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label) + + # Set destination output for a label if op is a leaf node and (,) == (DataPath,IOType.DISK) + is_leaf = g.is_leaf(op) + if is_leaf: + output_op_info = op.op_info + output_labels = output_op_info.get_labels(IO.OUTPUT) + for output_label in output_labels: + output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label) + output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label) + if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK: + op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label) + + # Change the current working directory to the working directory of the operator + # op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}" + relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/") + op_output_folder = str(Path(workdir, relative_output_path)) + os.makedirs(op_output_folder, exist_ok=True) + os.chdir(op_output_folder) + + # Execute pre_compute() + print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET) + op.pre_compute() + + # Execute compute() + print( + Fore.GREEN + + "Executing operator %s " % op.__class__.__name__ + + Fore.YELLOW + + "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid) + + Fore.RESET + ) + op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context) + + # Execute post_compute() + print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET) + op.post_compute() + + # Set input to next operator + next_ops = g.gen_next_operators(op) + for next_op in next_ops: + io_map = g.get_io_map(op, next_op) + if not io_map: + import inspect + + raise IOMappingError( + f"No IO mappings found for {op.name} -> {next_op.name} in " + f"{inspect.getabsfile(self.app.__class__)}" + ) + + next_op_exec_context = ExecutionContext(exec_context, next_op) + for (out_label, in_labels) in io_map.items(): + output = op_exec_context.output_context.get(out_label) + for in_label in in_labels: + next_op_exec_context.input_context.set(output, in_label) + finally: + # Always restore pwd even if an exception is raised (This logic can be run in an IPython environment) + os.chdir(old_pwd) # Remove a temporary workdir old_pwd = os.getcwd() diff --git a/monai/deploy/utils/importutil.py b/monai/deploy/utils/importutil.py index 47de77b4..6a889d59 100644 --- a/monai/deploy/utils/importutil.py +++ b/monai/deploy/utils/importutil.py @@ -96,9 +96,28 @@ def get_application(path: Union[str, Path]) -> Optional["Application"]: return None -def get_class_file_path(cls) -> Path: - """Get the file path of a class.""" - return Path(inspect.getfile(cls)) +def get_class_file_path(cls: Type) -> Path: + """Get the file path of a class. + + If file path is not available, it tries to get the internal class name used in IPython(starting with ). + + Args: + cls (Type): A class to get file path from. + + Returns: + A file path of the class. + """ + + try: + return Path(inspect.getfile(cls)) + except TypeError: + # If in IPython shell, use inspect.stack() to get the caller's file path + stack = inspect.stack() + for frame in stack: + if frame.filename.startswith("