Skip to content

Commit 04918a6

Browse files
authored
Merge pull request #113 from Project-MONAI/allow_prog_io
Allow app execution in Jupyter notebook
2 parents aac3ab3 + e59cbab commit 04918a6

File tree

5 files changed

+250
-99
lines changed

5 files changed

+250
-99
lines changed

monai/deploy/cli/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def parse_args(argv: Optional[List[str]] = None, default_command: Optional[str]
7878
return args
7979

8080

81-
def set_up_logging(level: str, config_path: Union[str, Path] = LOG_CONFIG_FILENAME):
81+
def set_up_logging(level: Optional[str], config_path: Union[str, Path] = LOG_CONFIG_FILENAME):
8282
"""Initializes the logger and sets up logging level.
8383
8484
Args:

monai/deploy/core/app_context.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12-
from argparse import Namespace
13-
from typing import Optional
12+
from typing import Dict, Optional
1413

1514
from .resource import Resource
1615
from .runtime_env import RuntimeEnv
@@ -19,21 +18,40 @@
1918
class AppContext:
2019
"""A class to store the context of an application."""
2120

22-
def __init__(self, args: Namespace, runtime_env: Optional[RuntimeEnv] = None):
21+
def __init__(self, args: Dict[str, str], runtime_env: Optional[RuntimeEnv] = None):
22+
# Set the args
23+
self.args: Dict[str, str] = {}
2324
# Set the runtime environment
2425
self.runtime_env = runtime_env or RuntimeEnv()
2526

27+
# Set the graph engine here because it would be used in the constructor of Application class so cannot be
28+
# updated in Application.run() method.
29+
self.graph = args.get("graph") or self.runtime_env.graph
30+
31+
self.update(args)
32+
33+
def update(self, args: Dict[str, str]):
34+
"""Update the context with new args and runtime_env."""
35+
# Update args
36+
self.args.update(args)
37+
2638
# Set the path to input/output/model
27-
self.input_path = args.input or self.runtime_env.input
28-
self.output_path = args.output or self.runtime_env.output
29-
self.model_path = args.model or self.runtime_env.model
30-
self.workdir = args.workdir or self.runtime_env.workdir
39+
self.input_path = args.get("input") or self.args.get("input") or self.runtime_env.input
40+
self.output_path = args.get("output") or self.args.get("output") or self.runtime_env.output
41+
self.model_path = args.get("model") or self.args.get("model") or self.runtime_env.model
42+
self.workdir = args.get("workdir") or self.args.get("workdir") or self.runtime_env.workdir
3143

32-
# Set the backend engines
33-
self.graph = args.graph or self.runtime_env.graph
34-
self.datastore = args.datastore or self.runtime_env.datastore
35-
self.executor = args.executor or self.runtime_env.executor
44+
# Set the backend engines except for the graph engine
45+
self.datastore = args.get("datastore") or self.args.get("datastore") or self.runtime_env.datastore
46+
self.executor = args.get("executor") or self.args.get("executor") or self.runtime_env.executor
3647

3748
# Set resource limits
3849
# TODO(gigony): Add cli option to set resource limits
3950
self.resource = Resource()
51+
52+
def __repr__(self):
53+
return (
54+
f"AppContext(graph={self.graph}, input_path={self.input_path}, output_path={self.output_path}, "
55+
f"model_path={self.model_path}, workdir={self.workdir}, datastore={self.datastore}, "
56+
f"executor={self.executor}, resource={self.resource})"
57+
)

monai/deploy/core/application.py

Lines changed: 123 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,26 @@ def __init__(
9494
else:
9595
self.path = get_class_file_path(self.__class__)
9696

97-
# Setup program arguments
98-
if path is None:
99-
argv = sys.argv
97+
# Set the runtime environment
98+
if str(self.path).startswith("<ipython-"):
99+
self.in_ipython = True
100100
else:
101+
self.in_ipython = False
102+
103+
# Setup program arguments
104+
if path is not None or self.in_ipython:
101105
# If `path` is specified, it means that it is called by
102106
# monai.deploy.utils.importutil.get_application() to get the package info.
103-
# In this case, we should not parse the arguments from the command line.
107+
# If `self.in_ipython` is True, it means that it is called by ipython environment.
108+
# In both cases, we should not parse the arguments from the command line.
104109
argv = [sys.executable, str(self.path)] # use default parameters
110+
else:
111+
argv = sys.argv
105112

106113
# Parse the command line arguments
107114
args = parse_args(argv, default_command="exec")
108115

109-
context = AppContext(args, runtime_env)
116+
context = AppContext(args.__dict__, runtime_env)
110117

111118
self._context: AppContext = context
112119

@@ -119,11 +126,7 @@ def __init__(
119126
self.compose()
120127

121128
if do_run:
122-
# Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder)
123-
# and run the application
124-
app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME
125-
set_up_logging(args.log_level, config_path=app_log_config_path)
126-
self.run()
129+
self.run(log_level=args.log_level)
127130

128131
@classmethod
129132
def __subclasshook__(cls, c: Type) -> bool:
@@ -316,10 +319,116 @@ def get_package_info(self, model_path: Union[str, Path] = "") -> Dict:
316319
"pip-packages": pip_requirement_list,
317320
}
318321

319-
def run(self):
320-
datastore = DatastoreFactory.create(self.context.datastore)
321-
executor = ExecutorFactory.create(self.context.executor, {"app": self, "datastore": datastore})
322-
executor.run()
322+
def run(
323+
self,
324+
log_level: Optional[str] = None,
325+
input: Optional[str] = None,
326+
output: Optional[str] = None,
327+
model: Optional[str] = None,
328+
workdir: Optional[str] = None,
329+
datastore: Optional[str] = None,
330+
executor: Optional[str] = None,
331+
) -> None:
332+
"""Runs the application.
333+
334+
This method accepts `log_level` to set the log level of the application.
335+
336+
Other arguments are used to specify the `input`, `output`, `model`, `workdir`, `datastore`, and `executor`.
337+
(Cannot set `graph` because it is set and used by the constructor.)
338+
339+
If those arguments are not specified, values in the application context will be used.
340+
341+
This method is useful when you want to interactively run the application inside IPython (Jupyter Notebook).
342+
343+
For example, you can run the following code in a notebook:
344+
345+
>>> from pathlib import Path
346+
>>> from monai.deploy.core import (
347+
>>> Application,
348+
>>> DataPath,
349+
>>> ExecutionContext,
350+
>>> InputContext,
351+
>>> IOType,
352+
>>> Operator,
353+
>>> OutputContext,
354+
>>> input,
355+
>>> output,
356+
>>> resource,
357+
>>> )
358+
>>>
359+
>>> @input("path", DataPath, IOType.DISK)
360+
>>> @output("path", DataPath, IOType.DISK)
361+
>>> class FirstOperator(Operator):
362+
>>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext):
363+
>>> print(f"First Operator. input:{input.get().path}, model:{context.models.get().path}")
364+
>>> output_path = Path("output_first.txt")
365+
>>> output_path.write_text("first output\\n")
366+
>>> output.set(DataPath(output_path))
367+
>>>
368+
>>> @input("path", DataPath, IOType.DISK)
369+
>>> @output("path", DataPath, IOType.DISK)
370+
>>> class SecondOperator(Operator):
371+
>>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext):
372+
>>> print(f"First Operator. output:{output.get().path}, model:{context.models.get().path}")
373+
>>> # The leaf operators can only read output DataPath and should not set output DataPath.
374+
>>> output_path = output.get().path / "output_second.txt"
375+
>>> output_path.write_text("second output\\n")
376+
>>>
377+
>>> class App(Application):
378+
>>> def compose(self):
379+
>>> first_op = FirstOperator()
380+
>>> second_op = SecondOperator()
381+
>>>
382+
>>> self.add_flow(first_op, second_op)
383+
>>>
384+
>>> if __name__ == "__main__":
385+
>>> App(do_run=True)
386+
387+
>>> app = App()
388+
>>> app.run(input="inp", output="out", model="model.pt")
389+
390+
>>> !ls out
391+
392+
Args:
393+
log_level (Optional[str]): A log level.
394+
input (Optional[str]): An input data path.
395+
output (Optional[str]): An output data path.
396+
model (Optional[str]): A model path.
397+
workdir (Optional[str]): A working directory path.
398+
datastore (Optional[str]): A datastore path.
399+
executor (Optional[str]): An executor name.
400+
"""
401+
# Set arguments
402+
args = {}
403+
if input is not None:
404+
args["input"] = input
405+
if output is not None:
406+
args["output"] = output
407+
if model is not None:
408+
args["model"] = model
409+
if workdir is not None:
410+
args["workdir"] = workdir
411+
if datastore is not None:
412+
args["datastore"] = datastore
413+
if executor is not None:
414+
args["executor"] = executor
415+
416+
# If no arguments are specified and if runtime is in IPython, do not run the application.
417+
if len(args) == 0 and self.in_ipython:
418+
return
419+
420+
# Update app context
421+
app_context = self.context
422+
app_context.update(args)
423+
424+
# Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder)
425+
# and run the application
426+
app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME
427+
set_up_logging(log_level, config_path=app_log_config_path)
428+
429+
datastore_obj = DatastoreFactory.create(app_context.datastore)
430+
executor_obj = ExecutorFactory.create(app_context.executor, {"app": self, "datastore": datastore_obj})
431+
executor_obj.run()
323432

324433
@abstractmethod
325434
def compose(self):

monai/deploy/core/executors/single_process_executor.py

Lines changed: 75 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def run(self):
4848
input_path = os.path.abspath(self.app.context.input_path)
4949
output_path = os.path.abspath(self.app.context.output_path)
5050

51+
# Create the output directory if it does not exist
52+
if not os.path.exists(output_path):
53+
os.makedirs(output_path, exist_ok=True)
54+
5155
# Store old pwd
5256
old_pwd = os.getcwd()
5357

@@ -73,76 +77,77 @@ def run(self):
7377

7478
g = self.app.graph
7579

76-
for op in g.gen_worklist():
77-
op_exec_context = ExecutionContext(exec_context, op)
78-
79-
# Set source input for a label if op is a root node and (<data type>, <storage type>) == (DataPath, IOType.DISK)
80-
is_root = g.is_root(op)
81-
if is_root:
82-
input_op_info = op.op_info
83-
input_labels = input_op_info.get_labels(IO.INPUT)
84-
for input_label in input_labels:
85-
input_data_type = input_op_info.get_data_type(IO.INPUT, input_label)
86-
input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label)
87-
if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK:
88-
op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label)
89-
90-
# Set destination output for a label if op is a leaf node and (<data type>, <storage type>) == (DataPath, IOType.DISK)
91-
is_leaf = g.is_leaf(op)
92-
if is_leaf:
93-
output_op_info = op.op_info
94-
output_labels = output_op_info.get_labels(IO.OUTPUT)
95-
for output_label in output_labels:
96-
output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label)
97-
output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label)
98-
if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK:
99-
op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label)
100-
101-
# Change the current working directory to the working directory of the operator
102-
# op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}"
103-
relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/")
104-
op_output_folder = str(Path(workdir, relative_output_path))
105-
os.makedirs(op_output_folder, exist_ok=True)
106-
os.chdir(op_output_folder)
107-
108-
# Execute pre_compute()
109-
print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET)
110-
op.pre_compute()
111-
112-
# Execute compute()
113-
print(
114-
Fore.GREEN
115-
+ "Executing operator %s " % op.__class__.__name__
116-
+ Fore.YELLOW
117-
+ "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid)
118-
+ Fore.RESET
119-
)
120-
op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context)
121-
122-
# Execute post_compute()
123-
print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET)
124-
op.post_compute()
125-
126-
# Set input to next operator
127-
next_ops = g.gen_next_operators(op)
128-
for next_op in next_ops:
129-
io_map = g.get_io_map(op, next_op)
130-
if not io_map:
131-
import inspect
132-
133-
raise IOMappingError(
134-
f"No IO mappings found for {op.name} -> {next_op.name} in "
135-
f"{inspect.getabsfile(self.app.__class__)}"
136-
)
137-
138-
next_op_exec_context = ExecutionContext(exec_context, next_op)
139-
for (out_label, in_labels) in io_map.items():
140-
output = op_exec_context.output_context.get(out_label)
141-
for in_label in in_labels:
142-
next_op_exec_context.input_context.set(output, in_label)
143-
144-
# Restore pwd
145-
os.chdir(old_pwd)
80+
try:
81+
for op in g.gen_worklist():
82+
op_exec_context = ExecutionContext(exec_context, op)
83+
84+
# Set source input for a label if op is a root node and (<data type>,<storage type>) == (DataPath,IOType.DISK)
85+
is_root = g.is_root(op)
86+
if is_root:
87+
input_op_info = op.op_info
88+
input_labels = input_op_info.get_labels(IO.INPUT)
89+
for input_label in input_labels:
90+
input_data_type = input_op_info.get_data_type(IO.INPUT, input_label)
91+
input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label)
92+
if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK:
93+
op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label)
94+
95+
# Set destination output for a label if op is a leaf node and (<data type>,<storage type>) == (DataPath,IOType.DISK)
96+
is_leaf = g.is_leaf(op)
97+
if is_leaf:
98+
output_op_info = op.op_info
99+
output_labels = output_op_info.get_labels(IO.OUTPUT)
100+
for output_label in output_labels:
101+
output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label)
102+
output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label)
103+
if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK:
104+
op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label)
105+
106+
# Change the current working directory to the working directory of the operator
107+
# op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}"
108+
relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/")
109+
op_output_folder = str(Path(workdir, relative_output_path))
110+
os.makedirs(op_output_folder, exist_ok=True)
111+
os.chdir(op_output_folder)
112+
113+
# Execute pre_compute()
114+
print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET)
115+
op.pre_compute()
116+
117+
# Execute compute()
118+
print(
119+
Fore.GREEN
120+
+ "Executing operator %s " % op.__class__.__name__
121+
+ Fore.YELLOW
122+
+ "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid)
123+
+ Fore.RESET
124+
)
125+
op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context)
126+
127+
# Execute post_compute()
128+
print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET)
129+
op.post_compute()
130+
131+
# Set input to next operator
132+
next_ops = g.gen_next_operators(op)
133+
for next_op in next_ops:
134+
io_map = g.get_io_map(op, next_op)
135+
if not io_map:
136+
import inspect
137+
138+
raise IOMappingError(
139+
f"No IO mappings found for {op.name} -> {next_op.name} in "
140+
f"{inspect.getabsfile(self.app.__class__)}"
141+
)
142+
143+
next_op_exec_context = ExecutionContext(exec_context, next_op)
144+
for (out_label, in_labels) in io_map.items():
145+
output = op_exec_context.output_context.get(out_label)
146+
for in_label in in_labels:
147+
next_op_exec_context.input_context.set(output, in_label)
148+
finally:
149+
# Always restore pwd even if an exception is raised (This logic can be run in an IPython environment)
150+
os.chdir(old_pwd)
146151

147152
# Remove a temporary workdir
148153
old_pwd = os.getcwd()

0 commit comments

Comments
 (0)