Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime][PipelineExecutor] Tutorial of using pipeline executor. #11557

Merged
merged 39 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
8bf383e
[Runtime][PipelineExecutor] Tutorial of using pipeline executor.
huajsj May 28, 2022
6332de0
fix ci issue
huajsj Jun 3, 2022
cb49f99
document change.
huajsj Jun 3, 2022
226fc58
triger build
huajsj Jun 3, 2022
031b3ad
fix doc issue
huajsj Jun 5, 2022
d046177
fix ci issue
huajsj Jun 5, 2022
8d01a7f
doc issue
huajsj Jun 5, 2022
86cfbe4
fix ci issue
huajsj Jun 8, 2022
22788ba
fix ci issue.
huajsj Jun 8, 2022
9a550fb
fix __file__ not found problem.
huajsj Jun 8, 2022
1b53258
fix byoc with dnnl issue
huajsj Jun 9, 2022
7757b1b
enable dnnl and pipeline executor
huajsj Jun 9, 2022
15db48a
trigger build
huajsj Jun 9, 2022
3b02c9a
trigger build
huajsj Jun 10, 2022
0811b24
fix build issue
huajsj Jun 21, 2022
53894ec
trigger build
huajsj Jun 22, 2022
fb4f821
oneflow cause crash, do test with change
huajsj Jul 5, 2022
e259798
add sphinx skip
huajsj Jul 5, 2022
b70a731
plint
huajsj Jul 5, 2022
215a2bd
remove from_oneflow change test.
huajsj Jul 5, 2022
bc6e863
remove pipeline executor change for test
huajsj Jul 5, 2022
7709974
plint
huajsj Jul 5, 2022
745ec3b
enable DNNL and pipeline
huajsj Jul 6, 2022
e14d431
disable DNNL
huajsj Jul 6, 2022
6640dd6
enable DNNL without pipeline
huajsj Jul 6, 2022
f5b61fd
remove dnnl and add cutlass
huajsj Jul 12, 2022
50a7eb9
use cutlass with byoc
huajsj Jul 12, 2022
0b30034
change into cutlass
huajsj Jul 17, 2022
873e027
fix doc convention issue
huajsj Jul 17, 2022
73656af
remove duplicate variable
huajsj Jul 17, 2022
e4d8360
fix plint issue.
huajsj Jul 17, 2022
cfd2af2
address review comments.
huajsj Jul 21, 2022
a1fc852
address review comments
huajsj Jul 21, 2022
60c8953
fix bug.
huajsj Jul 21, 2022
420e951
polish the document
huajsj Jul 21, 2022
b998f12
fix plint issue
huajsj Jul 21, 2022
1a930af
address review comments.
huajsj Jul 21, 2022
7449ff7
address review comments
huajsj Jul 22, 2022
0dcc5bf
address review comments
huajsj Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions gallery/how_to/work_with_relay/using_pipeline_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Using Pipeline Executor in Relay
=================================
**Author**: `Hua Jiang <https://https://github.com/huajsj>`_

This is a short tutorial on how to use "Pipeline Executor" with Relay.
"""
import tvm
from tvm import te
import numpy as np
from tvm.contrib import graph_executor as runtime
from tvm.relay.op.contrib.cutlass import partition_for_cutlass
from tvm import relay
from tvm.relay import testing
import tvm.testing
from tvm.contrib.cutlass import (
has_cutlass,
num_cutlass_partitions,
finalize_modules,
finalize_modules_vm,
)

img_size = 8
#######################################################################
# Create a simple network, this network can be a pre-trained model too.
# ---------------------------------------------------------------------
# Let's create a very simple network for demonstration.
# It consists of convolution, batch normalization, dense, and ReLU activation.
def get_network():
out_channels = 16
batch_size = 1
data = relay.var("data", relay.TensorType((batch_size, 3, img_size, img_size), "float16"))
dense_weight = relay.var(
"dweight", relay.TensorType((batch_size, 16 * img_size * img_size), "float16")
)
weight = relay.var("weight")
second_weight = relay.var("second_weight")
bn_gamma = relay.var("bn_gamma")
bn_beta = relay.var("bn_beta")
bn_mmean = relay.var("bn_mean")
bn_mvar = relay.var("bn_var")
simple_net = relay.nn.conv2d(
data=data, weight=weight, kernel_size=(3, 3), channels=out_channels, padding=(1, 1)
)
simple_net = relay.nn.batch_norm(simple_net, bn_gamma, bn_beta, bn_mmean, bn_mvar)[0]
simple_net = relay.nn.relu(simple_net)
simple_net = relay.nn.batch_flatten(simple_net)
simple_net = relay.nn.dense(simple_net, dense_weight)
simple_net = relay.Function(relay.analysis.free_vars(simple_net), simple_net)
data_shape = (batch_size, 3, img_size, img_size)
net, params = testing.create_workload(simple_net)
return net, params, data_shape


net, params, data_shape = get_network()
###########################################
# Splitting the network into two subgraphs.
# -----------------------------------------
# It is an example that the graph splitting function comes from a unit test. User can create a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first sentence is broken and makes no sense..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed into “This function called 'graph_split' from a unit test is just an example. User can create a customized logic to split the graph.”

# customized function to split the graph.
import inspect
import os

test_path = os.path.dirname(inspect.getfile(lambda: None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simply use __file__ here instead of inspect. And rename test_path to tutorial_dir.

Copy link
Contributor Author

@huajsj huajsj Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace "test_path" with "tutorial_dir",
the reason we use inspect instead of file is because that __file__ not work with sphinx-gallery which is used by tvm doc
huajsj@8d2bfc3

os.sys.path.append(os.path.join(test_path, "../../../tests/python/relay"))
from test_pipeline_executor import graph_split

###########################################
# Splitting the network into two subgraphs.
split_config = [{"op_name": "nn.relu", "op_index": 0}]
subgraphs = graph_split(net["main"], split_config, params)
###########################################################
# The generated subgraphs should look something like below.

"""
#subgraphs[0])

def @main(%data: Tensor[(1, 3, img_size, img_size), float16]) {
%0 = nn.conv2d(%data, meta[relay.Constant][0] /* ty=Tensor[(16, 3, 3, 3), float16] */, padding=[1, 1, 1, 1], channels=16, kernel_size=[3, 3]) /* ty=Tensor[(1, 16, img_size, img_size), float16] */;
%1 = nn.batch_norm(%0, meta[relay.Constant][1] /* ty=Tensor[(16), float16] */, meta[relay.Constant][2] /* ty=Tensor[(16), float16]*/, meta[relay.Constant][3] /* ty=Tensor[(16), float16] */, meta[relay.Constant][4] /* ty=Tensor[(16), float16] */) /* ty=(Tensor[(1,16, img_size, img_size), float16], Tensor[(16), float16], Tensor[(16), float16]) */;
%2 = %1.0;
nn.relu(%2) /* ty=Tensor[(1, 16, img_size, img_size), float16] */
}

#subgraphs[1]

def @main(%data_n_0: Tensor[(1, 16, 8, 8), float16] /* ty=Tensor[(1, 16, 8, 8), float16] */) {
%0 = nn.batch_flatten(%data_n_0) /* ty=Tensor[(1, 1024), float16] */;
nn.dense(%0, meta[relay.Constant][0] /* ty=Tensor[(1, 1024), float16] */, units=None) /* ty=Tensor[(1, 1), float16] */
}

"""

# sphinx_gallery_start_ignore
from tvm import testing

testing.utils.install_request_hook(depth=3)
# sphinx_gallery_end_ignore

#########################################
# Build the subgraph with cutlass target.
# ---------------------------------------

cutlass = tvm.target.Target(
{
"kind": "cutlass",
"sm": int(tvm.target.Target("cuda").arch.split("_")[1]),
"use_3xtf32": True,
"split_k_slices": [1],
"profile_all_alignments": False,
"find_first_valid": True,
"use_multiprocessing": True,
"use_fast_math": False,
"tmp_dir": "./tmp",
},
host=tvm.target.Target("llvm"),
)


def cutlass_build(mod, target, params=None, target_host=None, mod_name="default"):
target = [target, cutlass]
lib = relay.build_module.build(
mod, target=target, params=params, target_host=target_host, mod_name=mod_name
)
return lib


###########################################################
# Run the two subgraphs in pipeline with pipeline executor.
# ---------------------------------------------------------
# Set 'USE_PIPELINE_EXECUTOR' as ON, and set USE_CUTLASS' as ON in cmake.
from tvm.contrib import graph_executor, pipeline_executor, pipeline_executor_build

#########################################
# Create subgraph pipeline configuration.
# Associate a subgraph module with a target.
# Use CUTLASS BYOC to build the second subgraph module.
mod0, mod1 = subgraphs[0], subgraphs[1]
# Use cutlass as the codegen.
mod1 = partition_for_cutlass(mod1)
#################################################
# Get the pipeline executor configuration object.
pipe_config = pipeline_executor_build.PipelineConfig()
###########################################################################
# Set the compile target of the subgraph module.
pipe_config[mod0].target = "llvm"
pipe_config[mod0].dev = tvm.cpu(0)
###############################################################################
# Set the cpu afinity for control flow, for example using cpu 0 for control flow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clarify what is meant by "control flow", and why we need to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we run backend with executor for example cutlass, both cpu and gpu would get involved for the execution, cpu part response for preparing data, pre/post processing, transfer data between layer etc, I call this part as control flow.

under multiple backend situation, for example in this tutorial that is LLVM + CUTLASS, the 2 control flow will compete the cpu resource, and cause a lot of thread context switch, or cpu migration. These type resource competing will slow down the performance. by using the affinity setting, we associate a backend to a particular cpu group to avoid the said overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"control flow" usually means if/else or for loop in TVM or in general. How about "host operations"?

This also doesn't sound like something most users should be concerned about. I suggest removing affinity stuff from the tutorial and set the default affinity inside some runtime function. If you require affinity control by users, please summarize and add what you said above to the tutorial with correct English.

pipe_config[mod1].cpu_affinity = "0"
pipe_config[mod1].export_cc = None
huajsj marked this conversation as resolved.
Show resolved Hide resolved
##############################################################
# Set the compile target of the second subgraph module as cuda.
pipe_config[mod1].target = "cuda"
pipe_config[mod1].dev = tvm.device("cuda", 0)
pipe_config[mod1].build_func = cutlass_build
pipe_config[mod1].export_cc = "nvcc"
#################################################################################
# Set the cpu afinity for control flow, for example using cpu 1 for control flow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: afinity

Copy link
Contributor Author

@huajsj huajsj Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the affinity and use default tvm threadpoll default affinity logic.

pipe_config[mod1].cpu_affinity = "1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipe_config[mod1].cpu_affinity is written twice, here and at L166.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

pipe_config["input"]["data"].connect(pipe_config[mod0]["input"]["data"])
pipe_config[mod0]["output"][0].connect(pipe_config[mod1]["input"]["data_n_0"])
pipe_config[mod1]["output"]["0"].connect(pipe_config["output"][0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these three lines related to affinity control? You should have another ######## before them and explain what they do.

I have to say, this is not a good API. For example, where the names "data" and "data_n_0" come from? What is pipe_config[mod0]["output"][0]? And why you use "0" at L178?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these three line related connect subgraph to build pipeline instead of affinity, added detail explain.

"data" and "data_n_0" coming from subgraphs which is a list of subgraph, by print(subgraph[0]) , print(subgraph[1]) the said "data" and "data_n_0" will shown. if here give a wrong name which not exist , the API will throw a error.

pipe_config[mod0]["output"][0] means "the first output interface" of "mod0", line 178 "0" is typo , fixed.

######################################
# The pipeline configuration as below.
"""
print(pipe_config)
Inputs
|data: mod0:data

output
|output(0) : mod1.output(0)

connections
|mod0.output(0)-> mod1.data_n_0
"""

# sphinx_gallery_start_ignore
from tvm import testing

# testing.utils.install_request_hook(depth=3)
# sphinx_gallery_end_ignore
##############################
# Build the pipeline executor.
# ----------------------------
with tvm.transform.PassContext(opt_level=3):
pipeline_mod_factory = pipeline_executor_build.build(pipe_config)
###############################################
# Export the parameter configuration to a file.
directory_path = tvm.contrib.utils.tempdir().temp_dir
os.makedirs(directory_path, exist_ok=True)
config_file_name = pipeline_mod_factory.export_library(directory_path)
################################################################
# Use the load function to create and initialize PipelineModule.
# --------------------------------------------------------------
pipeline_module = pipeline_executor.PipelineModule.load_library(config_file_name)

############################
# Run the pipeline executor.
# --------------------------
# Allocate input data.
data = np.random.uniform(-1, 1, size=data_shape).astype("float16")
pipeline_module.set_input("data", tvm.nd.array(data))
##########################################################################
# Run the two subgraph in the pipeline mode to get the output asynchronously
# or synchronously. In the following example, it is synchronous.
pipeline_module.run()
outputs = pipeline_module.get_output()
######################################
# Use graph_executor for verification.
# ------------------------------------
# Run these two subgraphs in sequence with graph_executor to get the output.
target = "llvm"
dev0 = tvm.device(target, 0)
lib0 = relay.build_module.build(mod0, target, params=params)
module0 = runtime.GraphModule(lib0["default"](dev0))
cuda = tvm.target.Target("cuda", host=tvm.target.Target("llvm"))
lib1 = relay.build_module.build(mod1, [cuda, cutlass], params=params)
lib1 = finalize_modules(lib1, "compile.so", "./tmp")

dev1 = tvm.device("cuda", 0)

module1 = runtime.GraphModule(lib1["default"](dev1))

module0.set_input("data", data)
module0.run()
out_shape = (1, 16, img_size, img_size)
out = module0.get_output(0, tvm.nd.empty(out_shape, "float16"))
module1.set_input("data_n_0", out)
module1.run()
out_shape = (1, 1)
out = module1.get_output(0, tvm.nd.empty(out_shape, "float16"))
####################
# Verify the result.
tvm.testing.assert_allclose(outputs[0].numpy(), out.numpy())
26 changes: 22 additions & 4 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Pipeline executor that executes a series of modules in a pipeline fashion."""
import json
import os
import time
from tvm import runtime
from tvm._ffi import get_global_func
from tvm.contrib import graph_executor
Expand Down Expand Up @@ -131,14 +132,26 @@ def get_input(self, key):
"""
return self._get_input(key)

def get_output(self):
def get_output(self, synchronize=True, sleep_interval=0.001):
"""Get the output.
Returns
-------
data : Array[NDArray]
A list of output data.
synchronize : BOOL
Whether to do a synchronize poll.
sleep_interval : Float32
When doing the synchronize loop poll, how many seconds the loop should sleep for yield.
"""
return self._get_output()
outputs = []
if not synchronize:
outputs = self._get_output()
else:
while not outputs:
outputs = self._get_output()
time.sleep(sleep_interval)

return outputs

@property
def num_executing_pipeline(self):
Expand Down Expand Up @@ -302,11 +315,16 @@ def export_library(self, directory_path):
self.pipeline_mods[lib_index]["dev"].device_type,
self.pipeline_mods[lib_index]["dev"].device_id,
)

# Get the graph, lib, and parameters from GraphExecutorFactoryModule.
lib = self.pipeline_mods[lib_index]["lib"]
# Export the lib, graph, and parameters to disk.
lib.export_library(mconfig["lib_name"])
if self.pipeline_mods[lib_index]["export_cc"]:
lib.export_library(
mconfig["lib_name"], cc=self.pipeline_mods[lib_index]["export_cc"]
)
else:
lib.export_library(mconfig["lib_name"])

with open(mconfig["json_name"], "w") as file_handle:
file_handle.write(lib.graph_json)
with open(mconfig["params_name"], "wb") as file_handle:
Expand Down
14 changes: 9 additions & 5 deletions python/tvm/contrib/pipeline_executor_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ def build(pipe_configs):
# Use "mod_idx" as the key to create a "module_connection" map which is not only
# for the module index but also for the module connection used to build the pipeline.
module_string_config[mod_idx] = pipe_config
libs[mod_idx] = {"lib": lib, "dev": dev, "fcompile": mod_config["fcompile"]}
libs[mod_idx] = {
"lib": lib,
"dev": dev,
"fcompile": mod_config["fcompile"],
"export_cc": mod_config["export_cc"],
}

# Creating a text form configuration to record the "input_connection" and the
# "module_connection" information. The "input_connection" is used to record the
Expand Down Expand Up @@ -132,10 +137,7 @@ def export_library(factory, directory_path):
mconfig["json_name"] = "{}/json{}".format(directory_path, lib_index)
mconfig["params_name"] = "{}/params{}".format(directory_path, lib_index)
lib_config = factory.pipeline_mods[lib_index]
mconfig["dev"] = "{},{}".format(
lib_config["dev"].device_type,
lib_config["dev"].device_id,
)
mconfig["dev"] = "{},{}".format(lib_config["dev"].device_type, lib_config["dev"].device_id)
fcompile = lib_config["fcompile"]
if not fcompile:
fcompile = False
Expand Down Expand Up @@ -413,6 +415,7 @@ def __init__(self, mod=None):
self.fcompile = None
self.name = None
self.dev = None
self.export_cc = None
self.cpu_affinity = ""
self.idx = None
self.mod = mod
Expand Down Expand Up @@ -601,6 +604,7 @@ def get_config(self):
"target": module.target,
"fcompile": module.fcompile,
"dev": module.dev,
"export_cc": module.export_cc,
}

# Creating a map including pipeline inputs and subgraph inputs.
Expand Down
2 changes: 2 additions & 0 deletions tests/scripts/task_config_build_gpu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ echo set\(USE_LIBBACKTRACE AUTO\) >> config.cmake
echo set\(USE_CCACHE OFF\) >> config.cmake
echo set\(SUMMARIZE ON\) >> config.cmake
echo set\(HIDE_PRIVATE_SYMBOLS ON\) >> config.cmake
echo set\(USE_PIPELINE_EXECUTOR ON\) >> config.cmake
echo set\(USE_CUTLASS ON\) >> config.cmake