Skip to content

Commit

Permalink
address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
huajsj committed Aug 9, 2021
1 parent 54bd994 commit cd7a44f
Showing 1 changed file with 58 additions and 208 deletions.
266 changes: 58 additions & 208 deletions rfcs/0012-pipeline-executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
<!--- KIND, either express or implied. See the License for the -->
<!--- specific language governing permissions and limitations -->
<!--- under the License. -->
- Feature Name: (fill me in with a unique identifier, `my_awesome_feature`)
- Start Date: (fill me in with today's date, YYYY-MM-DD)
- Feature Name: Pipeline Executor
- Start Date: 2021-07-30
- RFC PR: [apache/tvm-rfcs#0014](https://github.com/apache/tvm-rfcs/pull/0014)
- GitHub Issue: [apache/tvm#8596](https://github.com/apache/tvm/issues/8596)

Expand All @@ -24,7 +24,7 @@

This proposal introduces Pipeline Executor: A runtime executor that by scheduling
splitted subgraph of relay graph in pipeline to implement task level parallism to
reduce compute latency.
improve compute throughput.

## 2. Motivation

Expand Down Expand Up @@ -53,230 +53,80 @@ Pipeline Executor provides:

* Use RPC to perform distributed computation cross multiple remote devices.

* Users can use Pipeline Executor to integrate pre-compute processing and post-processing with
network compute together and compute in same executor.
* Pipeline executor provide the capability to integrate non-DNN model function.

## 3. Guide-level explanation
Pipeline Executor is a runtime executor which implements pipeline execution logic for multiple
subgraphs and relies on graph_executor for operator storage and execution.

This section introduces the use case for Pipeline Executor.

* 1. Manually constructing pipeline subgraphs from a network compute graph.
* 2. Manually constructing pipeline subgraph configuration for dependency and target device.
* 3. Use pipeline_executor to build a pipeline module with the subgraphs and configuration.
* 4. Use pipeline_executor to load the pipeline module to run network in pipeline parallelism mode.
* 1. Using Automatic Graph Split feature to construct pipeline subgraph and configuration.
* 2. Use pipeline_executor to build a pipeline module with the subgraphs and configuration.
* 3. Use pipeline_executor to load the pipeline module to run network in pipeline parallelism mode.

### 3.1. Manually constructing pipeline subgraph from a network compute graph.
A pipeline subgraph is subgraph of network compute graph. Every pipeline subgraph from a network
have data dependency, and runs on different backends. The purpose of splitting a network into
pipeline subgraphs is to do network compute on different compute units and pipeline them to improve
compute throughput. Following is an example for network compute graph splitting.
### 3.1. Using Automatic Graph Split feature to construct pipeline subgraph and configuration.

```python
import tvm
from ...ir import IRModule
from ...relay import transform, build_module
def pipeline_graph(expr, indices):
"""Split Graph Into A Group Of Subgraph
Parameters
----------
expr : tvm.relay.Expr
indices : Array[int]
Returns
-------
ret : Array[tvm.relay.IRModule]
"""

def run_opt_pass(expr, opt_pass):
"""Exectue a relay pass"""
assert isinstance(opt_pass, tvm.transform.Pass)
mod = tvm.IRModule.from_expr(expr)
mod = tvm.relay.transform.InferType()(mod)
mod = opt_pass(mod)
entry = mod["main"]
return entry if isinstance(expr, tvm.relay.Function) else entry.body

def _operator_idx_inc(expr, operator_current_idx):
"""Increase operator index"""
if not isinstance(expr, tvm.relay.expr.Constant):
operator_current_idx = operator_current_idx + 1

return operator_current_idx

def merge_constant_expr(constant_expr, expr):
# merge constant express with a express
# Parameters
# ----------
# constant_expr:
# constant expression
# expr:
# expression to merge with constant expression

# If body not let, then reached end of the express
if not isinstance(constant_expr.body, tvm.relay.expr.Let):
return tvm.relay.expr.Let(constant_expr.var, constant_expr.value, expr)

return tvm.relay.expr.Let(
constant_expr.var, constant_expr.value, merge_constant_expr(constant_expr.body, expr)
)

def _recursion(anf, operator_indx, pipeline_mods, indices, constant_expr):
# Enumrate all operator of compute graph then split the compute graph
# into a group subgraph.
# Parameters
# ----------
# anf:
# ANF format expression
# operator_indx:
# current operator indice
# pipeline_mods:
# the subgraph list get storage in this variable
# indices:
# Array of indices use to define the subgraph scope
# constant_expr:
# constant defined before current operator

# Do the split work
if isinstance(anf, tvm.relay.Function):
return tvm.relay.Function(
anf.params,
_recursion(anf.body, operator_indx, pipeline_mods, indices, constant_expr),
anf.ret_type,
anf.type_params,
anf.attrs,
)
if isinstance(anf, tvm.relay.expr.Let):
value = anf.value
operator_indx = _operator_idx_inc(value, operator_indx)

# record constan expr to make sure all sugraph can find correct
# constant.
if isinstance(value, tvm.relay.expr.Constant):
if not constant_expr:
constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var)
else:
constant_expr = tvm.relay.expr.Let(anf.var, value, constant_expr)

if isinstance(value, tvm.relay.expr.Call):
if isinstance(value.op, tvm.ir.Op):

# if have expr a(b(c(d(e)))) and indexes are [1,2,3]
# then would get separate modules for a(b),c,d(e).
# the split area is a(b)[0,1] c[2,2] d(e)[2,3]
if indices and operator_indx == indices[0]:
indices.pop(0)
ann = _recursion(
anf.body, operator_indx, pipeline_mods, indices, constant_expr
)

# when current subgraph use previous subgraph constant,
# such constant may become free varaible due to the constant
# not exist, merge the previous constant with current subgraph
# to avoid such issue.
if constant_expr:
ann = merge_constant_expr(constant_expr, ann)

ann = run_opt_pass(ann, transform.ToGraphNormalForm())
mod = tvm.IRModule.from_expr(ann)
pipeline_mods.insert(0, mod)
return tvm.relay.expr.Let(anf.var, value, anf.var)
return tvm.relay.expr.Let(
anf.var,
value,
_recursion(anf.body, operator_indx, pipeline_mods, indices, constant_expr),
)
else:
return anf

pipeline_mods = []

# operator count start from 0, then initial value get set into -1
operator_indx = -1
constant_expr = None
subgraph_indices = indices.copy()
anf = run_opt_pass(expr, transform.ToANormalForm())
anf = run_opt_pass(anf, transform.InferType())
ann = _recursion(anf, operator_indx, pipeline_mods, subgraph_indices, constant_expr)
ann = run_opt_pass(ann.body, transform.ToGraphNormalForm())
mod = tvm.IRModule.from_expr(ann)
pipeline_mods.insert(0, mod)
return pipeline_mods
This feature not in this RFC scope. the logic as following.

#...
mod, params = relay.frontend.from_darknet(net, dtype=dtype, shape=dshape)
split = [11, 22]
mods = pipeline_graph(mod["main"], split)
```
this solution include 3 steps, 1. Operator Auto tune, 2. Graph dependency tree build and balance,
3. Graph Auto Tune. following are more detail.

### 3.2. Manually constructing pipeline subgraph configuration for dependency and target device
There are dependencies between pipeline subgraphs. For example, we have 3 pipeline subgraphs named
`s1`, `s2`, and `s3`. Their dependencies are `s1 -> s2 -> s3`. Users need to construct a configuation
file to describe such relation. The configuration also needs to involve "target" and "device" information
as the following example.
#### 3.1.1 Operator Auto Tune :

```python
mconfig = {"target_host": None, "mod_name": "default", "build": None, "params": None}
mconfig1 = mconfig.copy()
mconfig1["target"] = "cuda"
mconfig1["dev"] = tvm.gpu[0]
# third output is final output, second output for mod3, first for mod2
# input
mconfig1["pipeline"] = {
"mod_indx": 1,
"output": [
{"output_indx": 1, "dependent": [{"mod_indx": 2, "input_name": "data_0"}]},
{"output_indx": 2, "dependent": [{"mod_indx": 3, "input_name": "data_0"}]},
{"output_indx": 3, "dependent": [{"mod_indx": 0, "input_name": "1"}]},
],
}
mod_config[mods[0]] = mconfig1

mconfig2 = mconfig.copy()
mconfig2["target"] = "llvm"
mconfig2["dev"] = tvm.cpu(0)
mconfig2["pipeline"] = {
"mod_indx": 2,
"output": [
{"output_indx": 1, "dependent": [{"mod_indx": 3, "input_name": "data_1"}]},
],
}
mod_config[mods[1]] = mconfig2

mconfig3 = mconfig.copy()
mconfig3["target"] = "llvm"
mconfig3["dev"] = tvm.cpu(0)

mconfig3["pipeline"] = {
"mod_indx": 3,
"output": [{"output_indx": 1, "dependent": [{"mod_indx": 0, "input_name": "2"}]}],
}
mod_config[mods[2]] = mconfig3
```

### 3.3. Use pipeline_executor to build pipeline module with the said subgraph and configuration.
* a. In operator Auto tune tune section, user would using existing tuning logic to tune the every operator,
but the tune would separately and serialized happen in all target involved by pipeline executor.

* b. After operator tune done , here can get performance data, for example , con2d_0 best perf in
GPU is 3ms, in VTA is 2ms etc, this perf data would get used in later Graph dependency tree build
balance step.

#### 3.1.2. Graph dependency tree build balance

* a. Initialize a DAG, the node of the DAG is subgraph, initially for a N node DAG, first [1, N -1] node mapping to
[1 , N-1] layer(compute density operator and others) of original compute graph, the number N node is
mapping to [N, M] layer , M here is the original compute layer number.

* b. by using the perf data generated in 3.1.1.b , every dependency tree node can get a time consume value,
the time consume value for difference node not at beginning is not same, then we call this DAG is not balanced in
weight of node, by using the way to adjust the node(subgraph) scope(how many operator in this node), we make
every node of the DAG become same or value closed on weight(same time consume), then such DAG is a graph split
solution,
here we use DAG is to record the parent/child relation that child only can run after parent runned, and the scope
adjustment only can hapen between parent and child.

### 3.1.3 Graph Auto Tune.
* a. 3.1.2 can generate more than one subgraph split solution DAG, in this step, Graph Auto Tune would try these
multiple solution to get best configuration.

after 1. 2. 3. , here can get an automatic graph split configuration.

### 3.2. Use pipeline_executor to build pipeline module with the said subgraph and configuration.

Pipeline executor provide a build function to compile and save the compile output into disk,
following is a example

```python
with relay.build_config(opt_level=3):
pipeline_mods, string_config = pipeline_executor.build_pipeline(
mod_config, "<path to storage the build output>"
)
with autotvm.get_pipeline_model_best(mod_file) as mod_config: # this is future feature not in this RFC
with relay.build_config(opt_level=3):
lib = pipeline_executor.build_pipeline(mod_config)

```

### 3.4. Use pipeline_executor to load pipeline module to run network in pipeline parallism mode.
### 3.3. Use pipeline_executor to load pipeline module to run network in pipeline parallism mode.

Pipeline executor works asynchronously. Unlike the graph executor that launches a task by calling a blocking
`run` API, we can kick off a task by calling a non-blocking `set_input` API in pipeline executor:
set_input--> run
set_input--> run
get_ouput
set_input-->run
get_output
get_output

set_input: queue the input in the buffer.
run: run with the input at the front.
set_input: queue the input in the buffer.
run: run with the input at the front.
get_output
set_input: queue the input in the buffer.
run: run with the input at the front.
get_output
get_output

`get_output` can be called anytime, and it will return an empty array if no output is ready.

Expand All @@ -300,10 +150,9 @@ pipeline_outputs = []
datas = []
for i in range(len(mods) + 1):
datas.append(np.full(dshape, 3 + i).astype("float32"))
pipeline_module = pipeline_executor.create(pipeline_mods, string_config)
pipeline_module = pipeline_executor.create(lib)

for data in datas:
get_output(pipeline_outputs, pipeline_module)
pipeline_module.set_input("data_0", data)
pipeline_module.set_input("data_1", data, mod_idx=2)
pipeline_module.run()
Expand Down Expand Up @@ -343,8 +192,9 @@ Further graph splitting feature would do automatically split.
## 6. Rationale and alternative


Compute graph can get split into subgraph and pipeline execution can implement parallism
when these subgraph have dependency relation.
whithout pipeline executor, current tvm still can run network in Heterogeneous hardware but
that running is serialized instead of parallel run operator in different hardware



## 7. Prior art
Expand Down

0 comments on commit cd7a44f

Please sign in to comment.