diff --git a/CMakeLists.txt b/CMakeLists.txt index f8d8a4bf7f55..abe999428be1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -389,6 +389,12 @@ if(GTEST_INCLUDE_DIR AND GTEST_LIB) include(GoogleTest) endif() +if(USE_PIPELINE_EXECUTOR) + message(STATUS "Build with Pipeline Executor support...") + file(GLOB RUNTIME_PIPELINE_SRCS src/runtime/pipeline/*.cc) + list(APPEND RUNTIME_SRCS ${RUNTIME_PIPELINE_SRCS}) +endif(USE_PIPELINE_EXECUTOR) + # Module rules include(cmake/modules/VTA.cmake) include(cmake/modules/StandaloneCrt.cmake) diff --git a/cmake/config.cmake b/cmake/config.cmake index 8d8186c1b4f0..0ab498695fbf 100644 --- a/cmake/config.cmake +++ b/cmake/config.cmake @@ -105,6 +105,9 @@ set(USE_GRAPH_EXECUTOR ON) # Whether enable tiny graph executor with CUDA Graph set(USE_GRAPH_EXECUTOR_CUDA_GRAPH OFF) +# Whether enable pipeline executor. +set(USE_PIPELINE_EXECUTOR OFF) + # Whether to enable the profiler for the graph executor and vm set(USE_PROFILER ON) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py new file mode 100644 index 000000000000..36c03891d210 --- /dev/null +++ b/python/tvm/contrib/pipeline_executor.py @@ -0,0 +1,543 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Pipeline executor that executes a series of modules in a pipeline fashion.""" +import json +import tvm._ffi +from tvm import relay +from tvm.relay.transform import InferType +from tvm.contrib import graph_executor + + +def pipeline_executor_enabled(): + """Check if the pipeline executor is enabled. + + Return + ------- + enable: bool + Return whether the pipeline executor is enabled. + """ + return tvm._ffi.get_global_func("tvm.pipeline_executor.create", allow_missing=True) is not None + + +def build(pipe_configs): + """Build modules used in the pipeline executor, then use these modules and configuration + to create a pipeline executor. + + Parameters + ---------- + pipe_configs: PipelineConfig + Build Configuration information. + + Returns + ------- + ret: PipelineExecutorFactoryModule + Common interface for pipeline executor factory modules. + """ + mods = {} + mod_n_configs = pipe_configs.get_config() + config_len = len(mod_n_configs) + string_config = [{} for _ in range(config_len)] + for ir_mod, mod_config in mod_n_configs.items(): + mconf = mod_config["pipeline"].copy() + mod_idx = mconf["mod_idx"] - 1 + dev = mod_config["dev"] + target = mod_config["target"] + build_func = relay.build + # Check whether there is a customized build function. + if "build" in mod_config and mod_config["build"]: + build_func = mod_config["build"] + + mod = build_func( + ir_mod, + target, + params=mod_config["params"], + target_host=mod_config["target_host"], + mod_name=mod_config["mod_name"], + ) + + mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id) + # Create a pipeline configuration. + string_config[mod_idx] = mconf + mods[mod] = {"dev": dev} + + return PipelineExecutorFactoryModule(mods, string_config) + + +class PipelineModule(object): + """Wrapper of runtime module, caller can use this module to set parameters and get outputs. + + Parameters + ---------- + module : PipelineExecutorFactoryModule + Common interface for pipeline executor factory modules. + """ + + def __init__(self, module): + self.module = module.module + + +class PipelineConfig(object): + """Pipeline configuration information, this class contains the DAG that expresses + the dependency of each module involved in a pipeline and the parameters for building + each module. + """ + + class Binding: + """This class defines the module connections information. + The type can only be "input" or "output". + + Parameters + ---------- + owner : ModuleWrapper + The class who owns this interface. + + io_type : str + The I/O type of this interface. It can only be "input" or "output". + + name : str/integer + Name, for input it is string such as "data0", for output it is an integer such as 0. + + data_type: TensorType + The data type of this interface. + """ + + def __init__(self, owner, io_type, name, data_type=None): + self.io_owner = owner + self.io_type = io_type + self.name = str(name) + # Child interfaces that depend on this interface. + self.bindings = [] + # Parents interfaces that this interface depend on. + self.parents = [] + + self.data_type = data_type + + def get_name(self): + # Return name of this interface and the name of owner who owns this interface. + owner_name = "" + if isinstance(self.io_owner, PipelineConfig.ModuleWrapper): + owner_name = self.io_owner.name + + return owner_name, self.name + + def get_owner_idx(self): + # If the owner is ModuleWrapper return the owner index, if not return 0. + if isinstance(self.io_owner, PipelineConfig.ModuleWrapper): + return self.io_owner.idx + + return 0 + + def is_global_interface(self): + """The global interface is the interface visible to the caller which use a pipeline + executor, the global input interface is responsible for passing parameters to the + internal module interface, and the global output interface is responsible for + outputting the results computed by the pipeline executor to a caller. + """ + return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper) + + def __repr__(self): + # Get all binding information. + ret = " |{}: ".format(self.name) + for binding in self.bindings: + mname, dname = binding.get_name() + ret += "{0}:{1} ".format(mname, dname) + return ret + + def check_dag_acyclic(self, start, inputs): + """This is to check whether the DAG containing these input interfaces is acyclic. + Parameters + ---------- + start: ModuleWrapper + The starting node of the cycle check algorithm. + + inputs: Binding + These interfaces are used to connect to each other to build DAG. + + Return + ------ + Return true if there is no cycle in the DAG. + """ + for binding in inputs.values(): + if start == binding.io_owner: + return False + for p in binding.parents: + if not self.check_dag_acyclic(start, p.io_owner.input_bindings.bindings): + return False + + return True + + def connect(self, binding): + """Connect the current interface to the destination interface. + Correct connections are as follows: 1. global input connected to module input, + 2. module output connected to global output, 3. module output connected to + module input. + + Parameters + ---------- + binding: Binding + The destination of this connection. + """ + + # Check whether the binding setting is correct or not. + if self.io_owner == binding.io_owner: + raise RuntimeError(f"Can not bind itself.") + + if not self.is_global_interface() and self.io_type == "input": + raise RuntimeError(f"Module can only bind from output interface!") + + if ( + not self.is_global_interface() + and not binding.is_global_interface() + and binding.io_type == "output" + ): + raise RuntimeError(f"Can not bind module output with another module output!") + + if ( + not self.is_global_interface() + and binding.is_global_interface() + and binding.io_type == "input" + ): + raise RuntimeError(f"Can not bind module output with global input!") + + if self.is_global_interface() and self.io_type == "output": + raise RuntimeError(f"Global output can not be used as binding start point.") + + if self.is_global_interface() and binding.io_type != "input": + raise RuntimeError(f"Global input can only bind with module input.") + + self.bindings.append(binding) + if not self.is_global_interface(): + # Check whether the data types of the source and destination are the same. + if ( + isinstance(binding.io_owner, PipelineConfig.ModuleWrapper) + and self.data_type != binding.data_type + ): + raise RuntimeError( + f"Illegal type (%s vs. %s): binding type is not same!" + % (self.data_type, binding.data_type) + ) + + binding.parents.append(self) + + # Do acyclic check after increasing the in-degree of child node by setting + # current interface as a parent of the child node. + + if not self.check_dag_acyclic( + binding.io_owner, self.io_owner.input_bindings.bindings + ): + raise RuntimeError(f"Illegal connection: Cause a cycle!") + + class BindingList: + """Container for bindings(input or output interface). + + Parameters + ---------- + owner : ModuleWrapper/PipelineConfig + The owner of this class can be ModuleWrapper or PipelineConfig. + + io_type : str + The type of this class can be "input" or "output". + """ + + def __init__(self, owner, io_type): + self.bindings = {} + self.io_owner = owner + self.binding_type = io_type + + def get_binding_data_type(self, key): + if isinstance(self.io_owner, PipelineConfig.ModuleWrapper): + return self.io_owner.get_data_type(key, self.binding_type) + return None + + def __getitem__(self, key): + if key not in self.bindings: + data_type = self.get_binding_data_type(key) + if not data_type and isinstance(self.io_owner, PipelineConfig.ModuleWrapper): + raise RuntimeError(f"Can not find {key} in binding list {self.binding_type}.") + + self.bindings[key] = PipelineConfig.Binding( + self.io_owner, self.binding_type, key, data_type + ) + + return self.bindings[key] + + class ModuleWrapper: + """This class is a wrapper representing the module and contains information such as + module information, binding information and building information. + """ + + def __init__(self, mod=None): + self.target_host = None + self.build_func = None + self.params = None + self.target = None + self.name = None + self.dev = None + self.idx = None + self.mod = mod + self.input_params = InferType()(mod)["main"].params + self.output_type = InferType()(mod)["main"].checked_type.ret_type + self.input_bindings = PipelineConfig.BindingList(self, "input") + self.output_bindings = PipelineConfig.BindingList(self, "output") + + def __eq__(self, other): + if isinstance(other, PipelineConfig.ModuleWrapper): + return self.mod == other.mod + + return False + + def __getitem__(self, key): + if isinstance(key, str): + if key == "input": + return self.input_bindings + + if key == "output": + return self.output_bindings + + raise RuntimeError(f"{key} not found!") + + def get_data_type(self, key, interface_type): + """Get the module interface data type according to the key value and interface type. + Parameters + ---------- + key: str + The interface name. + + interface_type: + The interface type. + + Return + ------- + Return data type. + """ + if interface_type == "input": + for param in self.input_params: + if param.name_hint == key: + return param._checked_type_ + + if interface_type == "output": + if isinstance(self.output_type, tvm.ir.type.TupleType): + if int(key) < len(self.output_type.fields): + return self.output_type.fields[int(key)] + elif int(key) == 0: + return self.output_type + + return None + + def set_idx_name(self, idx): + # Set the index value and generate the module name. + self.idx = idx + self.name = "mod{}".format(str(idx)) + + def is_root_mod(self): + """Check whether this node is the root node in DAG, this function is used + in topological sort. + """ + return all([not b.parents for b in self.input_bindings.bindings.values()]) + + def remove_self_from_bindings(self): + """Remove the current node from child dependencies to reduce the in-degree + of child node, this function is used in topological sort. + """ + for binding in self.output_bindings.bindings.values(): + for child in binding.bindings: + if binding in child.parents: + child.parents.remove(binding) + + def __init__(self): + self.mod_wrapper = {} + self.input_bindings = self.BindingList(self, "input") + self.output_bindings = self.BindingList(self, "output") + + def __str__(self): + # Get configuration information as a string. + + # Use topological sort to get correct module order. + self.dag_topology_sort() + # Get the input dependencies. + input_dump = "Inputs\n" + for input_name in self.input_bindings.bindings: + inf = self.input_bindings.bindings[input_name] + input_dump += str(inf) + "\n" + + # Get the connections information of each module. + output = {} + connections_dump = "\nconnections\n" + for mod in self.mod_wrapper: + for interface in self.mod_wrapper[mod].output_bindings.bindings.values(): + if interface.bindings: + mname, dname = interface.get_name() + iname = mname + ".output(" + dname + ")->" + for dep in interface.bindings: + dep_mname, dep_dname = dep.get_name() + if isinstance(dep.io_owner, PipelineConfig.ModuleWrapper): + iname += f" {dep_mname}.{dep_dname}" + connections_dump += f" |{iname}\n" + else: + output[dep_dname] = f"{mname}.output({dname})" + + # Get the output dependencies. + output_dump = "\noutput\n" + for name in sorted(output.keys()): + output_dump += f" |output({name}) : {output[name]}\n" + + return input_dump + output_dump + connections_dump + + def __getitem__(self, key): + if isinstance(key, tvm.ir.module.IRModule): + if key not in self.mod_wrapper: + self.mod_wrapper[key] = self.ModuleWrapper(key) + return self.mod_wrapper[key] + + if isinstance(key, str): + if key == "input": + return self.input_bindings + if key == "output": + return self.output_bindings + + raise RuntimeError(f"{key} not found.") + + def get_config(self): + """Get the configuration information in dictionary form, this configuration + will be used to create pipeline executor. + """ + + # Use topological sort to get the correct order of modules. + self.dag_topology_sort() + mconfig = {} + for mod in self.mod_wrapper: + # Generate pipeline configuration. + mconf = {} + output_conf = [] + module = self.mod_wrapper[mod] + for _, binding in module.output_bindings.bindings.items(): + dep_conf = [] + output = {} + if binding.bindings: + for dep in binding.bindings: + dep_item = {} + _, dname = dep.get_name() + dep_item["mod_idx"] = dep.get_owner_idx() + dep_item["input_name"] = dname + dep_conf.append(dep_item) + + # The value of ouput_idx start from 0. + output["output_idx"] = int(binding.name) + output["dependent"] = dep_conf + output_conf.append(output) + + mconf["mod_idx"] = module.idx + mconf["output"] = output_conf + + mconfig[mod] = { + "pipeline": mconf, + "target_host": module.target_host, + "mod_name": "default", + "build": module.build_func, + "params": module.params, + "target": module.target, + "dev": module.dev, + } + + return mconfig + + def dag_topology_sort(self): + """Use topological sort to get order of pipeline modules.""" + mlist = [] + mod_wrapper = self.mod_wrapper.copy() + while mod_wrapper: + temp_list = [] + for mod, wrapper in mod_wrapper.items(): + if wrapper.is_root_mod(): + temp_list.append(mod) + wrapper.remove_self_from_bindings() + + for mod in temp_list: + mod_wrapper.pop(mod, None) + + mlist += temp_list + + for mod, i in zip(mlist, range(len(mlist))): + self.mod_wrapper[mod].set_idx_name(i + 1) + + def get_mod_idx(self, mod): + # Return the module index. + idx = self.mod_wrapper[mod].idx + return idx + + def pipe_input(self, name): + # Return the input interface according to the name. + return self.input_bindings[name] + + def pipe_output(self, idx): + # Return the output interface according to the name. + return self.output_bindings[idx] + + +class PipelineExecutorFactoryModule(object): + """Common interface for pipeline executor factory modules. + + Parameters + ---------- + pipeline_mods : List[GraphExecutorFactoryModule] + List of GraphExecutorFactoryModule. + + mod_config : Dict[int, Dict[str, Any]] + Modules dependency configuration information. + + """ + + def __init__(self, pipeline_mods, mods_config): + mods, config = self.graph_executor_create(pipeline_mods, mods_config) + assert ( + pipeline_executor_enabled() + ), "Pipeline executor is not enabled. Please \ + re-build TVM with USE_PIPELINE_EXECUTOR=ON" + pipeline_create = tvm._ffi.get_global_func( + "tvm.pipeline_executor.create", allow_missing=False + ) + assert pipeline_create + self.module = pipeline_create(mods, config) + + def graph_executor_create(self, pipeline_mods, mod_config): + """Create graph_executor list and return configuration as a json string. + + Parameters + ---------- + pipeline_mods : List[GraphExecutorFactoryModule] + List of GraphExecutorFactoryModule + + mod_config : Dict[str, Any] + Modules dependency configuration information. + + Returns + ------- + mods : List[Module] + The Module list. + + mod_config : str + The Modudle configuration. + """ + + mods = [] + for pipeline_mod in pipeline_mods: + mod = graph_executor.GraphModule( + pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"]) + ) + mods.append(mod.module) + + return mods, json.dumps(mod_config) diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc new file mode 100644 index 000000000000..41f867057282 --- /dev/null +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \file pipeline_executor.cc + */ +#include "pipeline_executor.h" + +namespace tvm { +namespace runtime { + +void PipelineRuntime::Init(const Array& modules, + const std::string& pipeline_json) { + return; +} + +/* GetFunction can not be pure abstract function, implement an empty function for now. + */ +PackedFunc PipelineRuntime::GetFunction(const std::string& name, + const ObjectPtr& sptr_to_self) { + return nullptr; +} + +Module PipelineRuntimeCreate(const Array& m, + const std::string& pipeline_json) { + auto exec = make_object(); + exec->Init(m, pipeline_json); + return Module(exec); +} + +TVM_REGISTER_GLOBAL("tvm.pipeline_executor.create").set_body([](TVMArgs args, TVMRetValue* rv) { + *rv = PipelineRuntimeCreate(args[0], args[1]); +}); +} // namespace runtime +} // namespace tvm diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h new file mode 100644 index 000000000000..c7625c62b724 --- /dev/null +++ b/src/runtime/pipeline/pipeline_executor.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \brief pipeline executor + * \file pipeline_executor.h + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ +#include + +#include +namespace tvm { +namespace runtime { +/*! + * \brief pipeline executor. + * This executor class use the module list and dependency configuration of modules as + * the parameters and executes these modules on heterogeneous targets in a pipeline + * parallel manner to improve throughput. + * + * This executor can be accessed by various language via TVM runtime PackedFunc API. + */ +class TVM_DLL PipelineRuntime : public ModuleNode { + public: + /*! + * \Return the type key of the executor. + */ + const char* type_key() const final { return "PipelineRuntime"; } + /*! + * \brief Initialize the pipeline executor with module array and json text. + * \param modules The module list used for building pipeline. + * \param pipeline_json The configuration of modules dependencies. + */ + void Init(const Array& modules, const std::string& pipeline_json); + /*! + * \brief Give frontends an access to packed functions. + * \param name The name of the function. + * \param sptr_to_self The pointer to the module node. + * \return The corresponding packed function. + */ + virtual PackedFunc GetFunction(const std::string& name, const ObjectPtr& sptr_to_self); +}; +} // namespace runtime +} // namespace tvm +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py new file mode 100644 index 000000000000..d9411c92c375 --- /dev/null +++ b/tests/python/relay/test_pipeline_executor.py @@ -0,0 +1,239 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import numpy as np +import tvm +import tvm.testing +from tvm import relay +from tvm.relay import transform +from tvm.contrib import graph_executor, pipeline_executor + + +def get_mannual_mod(): + # Get a list of modules representing subgraphs. + mods = [] + dshape = (3, 3) + data = relay.var("data_0", relay.TensorType(dshape, "float32")) + data21 = relay.var("data_1", relay.TensorType(dshape, "float32")) + data_net1_output_1 = relay.var("data_0", relay.TensorType(dshape, "float32")) + data_net1_output_2 = relay.var("data_1", relay.TensorType(dshape, "float32")) + data_net2_output_1 = relay.var("data_0", relay.TensorType(dshape, "float32")) + mvalue1 = np.full((1), 1).astype("float32") + mvalue2 = np.full((1), 2).astype("float32") + mvalue3 = np.full((1), 3).astype("float32") + mv1 = relay.Constant(tvm.nd.array(mvalue1)) + mv2 = relay.Constant(tvm.nd.array(mvalue2)) + mv3 = relay.Constant(tvm.nd.array(mvalue3)) + + # There are three outputs in the first model. + + net1_output1 = relay.add(data, mv1) + net1_output2 = relay.subtract(data, mv2) + net1_output3 = relay.multiply(data, mv3) + + # The second model use output named net1_output1 of the first model as the first input, + # the second input of the second model is data21. + net2 = relay.add(data_net1_output_1, mv2) + net2 = relay.add(net2, data21) + net2_output = relay.add(net2, mv3) + + # The third model use the output named net2_output of the second model as the first input + # and use the output named net1_output2 of the first model as the second input. + net3 = relay.multiply(data_net2_output_1, mv3) + net3 = relay.add(net3, data_net1_output_2) + + mods.append( + tvm.IRModule.from_expr( + relay.Function([data], relay.Tuple([net1_output1, net1_output2, net1_output3])) + ) + ) + mods.append(tvm.IRModule.from_expr(relay.Function([data_net1_output_1, data21], net2_output))) + mods.append( + tvm.IRModule.from_expr(relay.Function([data_net1_output_2, data_net2_output_1], net3)) + ) + + return mods, dshape + + +def get_manual_conf(mods, target): + # This function is used to generate manual pipeline configuration. + mod_config = {} + # The third output is the final output, the second output is for mod3, the first output + # is for mod2 input. + pipe_config1 = { + "mod_idx": 1, + "output": [ + {"output_idx": 0, "dependent": [{"mod_idx": 2, "input_name": "data_0"}]}, + {"output_idx": 1, "dependent": [{"mod_idx": 3, "input_name": "data_0"}]}, + {"output_idx": 2, "dependent": [{"mod_idx": 0, "input_name": "0"}]}, + ], + } + mod_config[mods[0]] = { + "pipeline": pipe_config1, + "target_host": None, + "mod_name": "default", + "build": None, + "params": None, + "target": target[0], + "dev": target[1], + } + + pipe_config2 = { + "mod_idx": 2, + "output": [ + {"output_idx": 0, "dependent": [{"mod_idx": 3, "input_name": "data_1"}]}, + ], + } + mod_config[mods[1]] = { + "pipeline": pipe_config2, + "target_host": None, + "mod_name": "default", + "build": None, + "params": None, + "target": "llvm", + "dev": tvm.cpu(0), + } + + pipe_config3 = { + "mod_idx": 3, + "output": [{"output_idx": 0, "dependent": [{"mod_idx": 0, "input_name": "1"}]}], + } + mod_config[mods[2]] = { + "pipeline": pipe_config3, + "target_host": None, + "mod_name": "default", + "build": None, + "params": None, + "target": "llvm", + "dev": tvm.cpu(0), + } + return mod_config + + +def test_pipe_config_check(): + # This function is used to trigger runtime error by applying wrong logic connection. + + # Get the three pipeline modules here. + (mod1, mod2, mod3), dshape = get_mannual_mod() + + # The input or output name is illegal and expects a runtime error. + pipe_error = pipeline_executor.PipelineConfig() + with pytest.raises(RuntimeError): + pipe_error[mod1]["output"][9] + + with pytest.raises(RuntimeError): + pipe_error[mod1]["input"]["data_9"] + + # The module connection will cause a cycle in DAG and expects runtime error. + with pytest.raises(RuntimeError): + pipe_error[mod1]["output"][0].connect(pipe_error[mod2]["input"]["data_0"]) + pipe_error[mod2]["output"][0].connect(pipe_error[mod1]["input"]["data_0"]) + + # The module connection is illegal and expects runtime error. + + with pytest.raises(RuntimeError): + pipe_error[mod1]["output"][0].connect(pipe_error[mod1]["input"]["data_0"]) + + with pytest.raises(RuntimeError): + pipe_error[mod1]["input"]["data_0"].connect(pipe_error[mod1]["input"]["data_0"]) + + with pytest.raises(RuntimeError): + pipe_error[mod1]["input"]["data_0"].connect(pipe_error[mod2]["input"]["data_0"]) + + with pytest.raises(RuntimeError): + pipe_error[mod1]["output"][0].connect(pipe_error["input"]["data_0"]) + + with pytest.raises(RuntimeError): + pipe_error["input"]["data_0"].connect(pipe_error[mod1]["output"][0]) + + with pytest.raises(RuntimeError): + pipe_error["output"]["0"].connect(pipe_error[mod1]["output"][0]) + + +def test_pipeline(): + if pipeline_executor.pipeline_executor_enabled(): + target_list = tvm.testing.enabled_targets() + for target in target_list: + # Get the three pipeline modules here. + (mod1, mod2, mod3), dshape = get_mannual_mod() + + # Prepare batch data for pipeline computation. + datas = [] + for i in range(5): + datas.append(np.full(dshape, 3 + i).astype("float32")) + + pipe_config = pipeline_executor.PipelineConfig() + + # The global input named "data_0" will be connected to a input named "data_0" of mod1. + pipe_config["input"]["data_0"].connect(pipe_config[mod1]["input"]["data_0"]) + + # The global Input named "data_1" will be connected to a input named "data_1" of mod2. + pipe_config["input"]["data_1"].connect(pipe_config[mod2]["input"]["data_1"]) + + # The mod1 output[0] will be connected to a input named "data_0" of mod2. + pipe_config[mod1]["output"][0].connect(pipe_config[mod2]["input"]["data_0"]) + + # The mod1 output[1] will be connected to a input named "data_0" of mod3. + pipe_config[mod1]["output"][1].connect(pipe_config[mod3]["input"]["data_0"]) + + # The mod2 output[2] will be connected to a input named "data_1" of mod3. + pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_1"]) + + # The mod1 output[2] will be connected to global output[1]. + pipe_config[mod1]["output"][2].connect(pipe_config["output"]["0"]) + + # The mod3 output[0] will be connected to global output[2]. + pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"]) + # Print configueration (print(pipe_config)), the result looks like following. + # + # Inputs + # |data_0: mod1:data_0 + # |data_1: mod2:data_1 + # + # output + # |output(1) : mod1.output(2) + # |output(2) : mod3.output(0) + # + # connections + # |mod1.output(0)-> mod2.data_0 + # |mod1.output(1)-> mod3.data_0 + # |mod2.output(0)-> mod3.data_1 + + # Set other parameters. + pipe_config[mod1].target = target[0] + pipe_config[mod1].dev = target[1] + + pipe_config[mod2].target = "llvm" + pipe_config[mod2].dev = tvm.cpu(0) + + pipe_config[mod3].target = "llvm" + pipe_config[mod3].dev = tvm.cpu(0) + + # Here is to check the correctness of the configuration generated by API. + assert pipe_config.get_config() == get_manual_conf([mod1, mod2, mod3], target) + + # Build and create a pipeline module. + with tvm.transform.PassContext(opt_level=3): + pipeline_mod_factory = pipeline_executor.build(pipe_config) + + pipeline_module = pipeline_executor.PipelineModule(pipeline_mod_factory) + assert pipeline_module + + +if __name__ == "__main__": + pytest.main([__file__])