Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion monai/deploy/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def parse_args(argv: Optional[List[str]] = None, default_command: Optional[str]
return args


def set_up_logging(level: str, config_path: Union[str, Path] = LOG_CONFIG_FILENAME):
def set_up_logging(level: Optional[str], config_path: Union[str, Path] = LOG_CONFIG_FILENAME):
"""Initializes the logger and sets up logging level.

Args:
Expand Down
40 changes: 29 additions & 11 deletions monai/deploy/core/app_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from argparse import Namespace
from typing import Optional
from typing import Dict, Optional

from .resource import Resource
from .runtime_env import RuntimeEnv
Expand All @@ -19,21 +18,40 @@
class AppContext:
"""A class to store the context of an application."""

def __init__(self, args: Namespace, runtime_env: Optional[RuntimeEnv] = None):
def __init__(self, args: Dict[str, str], runtime_env: Optional[RuntimeEnv] = None):
# Set the args
self.args: Dict[str, str] = {}
# Set the runtime environment
self.runtime_env = runtime_env or RuntimeEnv()

# Set the graph engine here because it would be used in the constructor of Application class so cannot be
# updated in Application.run() method.
self.graph = args.get("graph") or self.runtime_env.graph

self.update(args)

def update(self, args: Dict[str, str]):
"""Update the context with new args and runtime_env."""
# Update args
self.args.update(args)

# Set the path to input/output/model
self.input_path = args.input or self.runtime_env.input
self.output_path = args.output or self.runtime_env.output
self.model_path = args.model or self.runtime_env.model
self.workdir = args.workdir or self.runtime_env.workdir
self.input_path = args.get("input") or self.args.get("input") or self.runtime_env.input
self.output_path = args.get("output") or self.args.get("output") or self.runtime_env.output
self.model_path = args.get("model") or self.args.get("model") or self.runtime_env.model
self.workdir = args.get("workdir") or self.args.get("workdir") or self.runtime_env.workdir

# Set the backend engines
self.graph = args.graph or self.runtime_env.graph
self.datastore = args.datastore or self.runtime_env.datastore
self.executor = args.executor or self.runtime_env.executor
# Set the backend engines except for the graph engine
self.datastore = args.get("datastore") or self.args.get("datastore") or self.runtime_env.datastore
self.executor = args.get("executor") or self.args.get("executor") or self.runtime_env.executor

# Set resource limits
# TODO(gigony): Add cli option to set resource limits
self.resource = Resource()

def __repr__(self):
return (
f"AppContext(graph={self.graph}, input_path={self.input_path}, output_path={self.output_path}, "
f"model_path={self.model_path}, workdir={self.workdir}, datastore={self.datastore}, "
f"executor={self.executor}, resource={self.resource})"
)
137 changes: 123 additions & 14 deletions monai/deploy/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,26 @@ def __init__(
else:
self.path = get_class_file_path(self.__class__)

# Setup program arguments
if path is None:
argv = sys.argv
# Set the runtime environment
if str(self.path).startswith("<ipython-"):
self.in_ipython = True
else:
self.in_ipython = False

# Setup program arguments
if path is not None or self.in_ipython:
# If `path` is specified, it means that it is called by
# monai.deploy.utils.importutil.get_application() to get the package info.
# In this case, we should not parse the arguments from the command line.
# If `self.in_ipython` is True, it means that it is called by ipython environment.
# In both cases, we should not parse the arguments from the command line.
argv = [sys.executable, str(self.path)] # use default parameters
else:
argv = sys.argv

# Parse the command line arguments
args = parse_args(argv, default_command="exec")

context = AppContext(args, runtime_env)
context = AppContext(args.__dict__, runtime_env)

self._context: AppContext = context

Expand All @@ -119,11 +126,7 @@ def __init__(
self.compose()

if do_run:
# Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder)
# and run the application
app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME
set_up_logging(args.log_level, config_path=app_log_config_path)
self.run()
self.run(log_level=args.log_level)

@classmethod
def __subclasshook__(cls, c: Type) -> bool:
Expand Down Expand Up @@ -316,10 +319,116 @@ def get_package_info(self, model_path: Union[str, Path] = "") -> Dict:
"pip-packages": pip_requirement_list,
}

def run(self):
datastore = DatastoreFactory.create(self.context.datastore)
executor = ExecutorFactory.create(self.context.executor, {"app": self, "datastore": datastore})
executor.run()
def run(
self,
log_level: Optional[str] = None,
input: Optional[str] = None,
output: Optional[str] = None,
model: Optional[str] = None,
workdir: Optional[str] = None,
datastore: Optional[str] = None,
executor: Optional[str] = None,
) -> None:
"""Runs the application.

This method accepts `log_level` to set the log level of the application.

Other arguments are used to specify the `input`, `output`, `model`, `workdir`, `datastore`, and `executor`.
(Cannot set `graph` because it is set and used by the constructor.)

If those arguments are not specified, values in the application context will be used.

This method is useful when you want to interactively run the application inside IPython (Jupyter Notebook).

For example, you can run the following code in a notebook:

>>> from pathlib import Path
>>> from monai.deploy.core import (
>>> Application,
>>> DataPath,
>>> ExecutionContext,
>>> InputContext,
>>> IOType,
>>> Operator,
>>> OutputContext,
>>> input,
>>> output,
>>> resource,
>>> )
>>>
>>> @input("path", DataPath, IOType.DISK)
>>> @output("path", DataPath, IOType.DISK)
>>> class FirstOperator(Operator):
>>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext):
>>> print(f"First Operator. input:{input.get().path}, model:{context.models.get().path}")
>>> output_path = Path("output_first.txt")
>>> output_path.write_text("first output\\n")
>>> output.set(DataPath(output_path))
>>>
>>> @input("path", DataPath, IOType.DISK)
>>> @output("path", DataPath, IOType.DISK)
>>> class SecondOperator(Operator):
>>> def compute(self, input: InputContext, output: OutputContext, context: ExecutionContext):
>>> print(f"First Operator. output:{output.get().path}, model:{context.models.get().path}")
>>> # The leaf operators can only read output DataPath and should not set output DataPath.
>>> output_path = output.get().path / "output_second.txt"
>>> output_path.write_text("second output\\n")
>>>
>>> class App(Application):
>>> def compose(self):
>>> first_op = FirstOperator()
>>> second_op = SecondOperator()
>>>
>>> self.add_flow(first_op, second_op)
>>>
>>> if __name__ == "__main__":
>>> App(do_run=True)

>>> app = App()
>>> app.run(input="inp", output="out", model="model.pt")

>>> !ls out

Args:
log_level (Optional[str]): A log level.
input (Optional[str]): An input data path.
output (Optional[str]): An output data path.
model (Optional[str]): A model path.
workdir (Optional[str]): A working directory path.
datastore (Optional[str]): A datastore path.
executor (Optional[str]): An executor name.
"""
# Set arguments
args = {}
if input is not None:
args["input"] = input
if output is not None:
args["output"] = output
if model is not None:
args["model"] = model
if workdir is not None:
args["workdir"] = workdir
if datastore is not None:
args["datastore"] = datastore
if executor is not None:
args["executor"] = executor

# If no arguments are specified and if runtime is in IPython, do not run the application.
if len(args) == 0 and self.in_ipython:
return

# Update app context
app_context = self.context
app_context.update(args)

# Set up logging (try to load `LOG_CONFIG_FILENAME` in the application folder)
# and run the application
app_log_config_path = self.path.parent / LOG_CONFIG_FILENAME
set_up_logging(log_level, config_path=app_log_config_path)

datastore_obj = DatastoreFactory.create(app_context.datastore)
executor_obj = ExecutorFactory.create(app_context.executor, {"app": self, "datastore": datastore_obj})
executor_obj.run()

@abstractmethod
def compose(self):
Expand Down
145 changes: 75 additions & 70 deletions monai/deploy/core/executors/single_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def run(self):
input_path = os.path.abspath(self.app.context.input_path)
output_path = os.path.abspath(self.app.context.output_path)

# Create the output directory if it does not exist
if not os.path.exists(output_path):
os.makedirs(output_path, exist_ok=True)

# Store old pwd
old_pwd = os.getcwd()

Expand All @@ -73,76 +77,77 @@ def run(self):

g = self.app.graph

for op in g.gen_worklist():
op_exec_context = ExecutionContext(exec_context, op)

# Set source input for a label if op is a root node and (<data type>, <storage type>) == (DataPath, IOType.DISK)
is_root = g.is_root(op)
if is_root:
input_op_info = op.op_info
input_labels = input_op_info.get_labels(IO.INPUT)
for input_label in input_labels:
input_data_type = input_op_info.get_data_type(IO.INPUT, input_label)
input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label)
if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK:
op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label)

# Set destination output for a label if op is a leaf node and (<data type>, <storage type>) == (DataPath, IOType.DISK)
is_leaf = g.is_leaf(op)
if is_leaf:
output_op_info = op.op_info
output_labels = output_op_info.get_labels(IO.OUTPUT)
for output_label in output_labels:
output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label)
output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label)
if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK:
op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label)

# Change the current working directory to the working directory of the operator
# op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}"
relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/")
op_output_folder = str(Path(workdir, relative_output_path))
os.makedirs(op_output_folder, exist_ok=True)
os.chdir(op_output_folder)

# Execute pre_compute()
print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET)
op.pre_compute()

# Execute compute()
print(
Fore.GREEN
+ "Executing operator %s " % op.__class__.__name__
+ Fore.YELLOW
+ "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid)
+ Fore.RESET
)
op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context)

# Execute post_compute()
print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET)
op.post_compute()

# Set input to next operator
next_ops = g.gen_next_operators(op)
for next_op in next_ops:
io_map = g.get_io_map(op, next_op)
if not io_map:
import inspect

raise IOMappingError(
f"No IO mappings found for {op.name} -> {next_op.name} in "
f"{inspect.getabsfile(self.app.__class__)}"
)

next_op_exec_context = ExecutionContext(exec_context, next_op)
for (out_label, in_labels) in io_map.items():
output = op_exec_context.output_context.get(out_label)
for in_label in in_labels:
next_op_exec_context.input_context.set(output, in_label)

# Restore pwd
os.chdir(old_pwd)
try:
for op in g.gen_worklist():
op_exec_context = ExecutionContext(exec_context, op)

# Set source input for a label if op is a root node and (<data type>,<storage type>) == (DataPath,IOType.DISK)
is_root = g.is_root(op)
if is_root:
input_op_info = op.op_info
input_labels = input_op_info.get_labels(IO.INPUT)
for input_label in input_labels:
input_data_type = input_op_info.get_data_type(IO.INPUT, input_label)
input_storage_type = input_op_info.get_storage_type(IO.INPUT, input_label)
if issubclass(input_data_type, DataPath) and input_storage_type == IOType.DISK:
op_exec_context.input_context.set(DataPath(input_path, read_only=True), input_label)

# Set destination output for a label if op is a leaf node and (<data type>,<storage type>) == (DataPath,IOType.DISK)
is_leaf = g.is_leaf(op)
if is_leaf:
output_op_info = op.op_info
output_labels = output_op_info.get_labels(IO.OUTPUT)
for output_label in output_labels:
output_data_type = output_op_info.get_data_type(IO.OUTPUT, output_label)
output_storage_type = output_op_info.get_storage_type(IO.OUTPUT, output_label)
if issubclass(output_data_type, DataPath) and output_storage_type == IOType.DISK:
op_exec_context.output_context.set(DataPath(output_path, read_only=True), output_label)

# Change the current working directory to the working directory of the operator
# op_output_folder == f"{workdir}/operators/{op.uid}/{op_exec_context.get_execution_index()}/{IO.OUTPUT}"
relative_output_path = Path(op_exec_context.output_context.get_group_path(IO.OUTPUT)).relative_to("/")
op_output_folder = str(Path(workdir, relative_output_path))
os.makedirs(op_output_folder, exist_ok=True)
os.chdir(op_output_folder)

# Execute pre_compute()
print(Fore.BLUE + "Going to initiate execution of operator %s" % op.__class__.__name__ + Fore.RESET)
op.pre_compute()

# Execute compute()
print(
Fore.GREEN
+ "Executing operator %s " % op.__class__.__name__
+ Fore.YELLOW
+ "(Process ID: %s, Operator ID: %s)" % (os.getpid(), op.uid)
+ Fore.RESET
)
op.compute(op_exec_context.input_context, op_exec_context.output_context, op_exec_context)

# Execute post_compute()
print(Fore.BLUE + "Done performing execution of operator %s\n" % op.__class__.__name__ + Fore.RESET)
op.post_compute()

# Set input to next operator
next_ops = g.gen_next_operators(op)
for next_op in next_ops:
io_map = g.get_io_map(op, next_op)
if not io_map:
import inspect

raise IOMappingError(
f"No IO mappings found for {op.name} -> {next_op.name} in "
f"{inspect.getabsfile(self.app.__class__)}"
)

next_op_exec_context = ExecutionContext(exec_context, next_op)
for (out_label, in_labels) in io_map.items():
output = op_exec_context.output_context.get(out_label)
for in_label in in_labels:
next_op_exec_context.input_context.set(output, in_label)
finally:
# Always restore pwd even if an exception is raised (This logic can be run in an IPython environment)
os.chdir(old_pwd)

# Remove a temporary workdir
old_pwd = os.getcwd()
Expand Down
Loading