Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pass-through tracking to auto-pinning buffers #4294

Merged
merged 6 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::SetupOutputInfo(OpGraph &graph) {
pipeline_outputs_ = graph.GetOutputs(output_names_);


graph.SetupMakeContiguousPassThrough(output_names_);
graph.SetupMakeContiguousPassThrough();

// If there are GPU outputs from given stages, we have to wait for them
auto has_gpu_output = [] (OpType stage_type, const auto &pipeline_outputs,
Expand Down Expand Up @@ -670,6 +670,22 @@ void Executor<WorkspacePolicy, QueuePolicy>::PrepinData(
}
return;
}

auto pin_cpu_passthrough = [](std::vector<tensor_data_store_queue_t> &tensor_to_store_queue,
const OpGraph &graph, int tid) {
auto origin_group = graph.GetTensorOrigin(tid);
// For all tensors that are forming a pass through group ...
for (auto &origin_tensor_id : origin_group) {
// (we do this only for CPU data produced in CPU nodes)
auto &parent_tensor_queue =
get_queue<OpType::CPU, StorageDevice::CPU>(tensor_to_store_queue[origin_tensor_id]);
for (auto &batch : parent_tensor_queue) {
// ... mark all executor buffer queues as `pinned`
batch->set_pinned(true);
}
}
};

// We only pin what we need:
// The inputs of mixed ops are potentially used for H2D copies...
for (int i = 0; i < graph.NumOp(OpType::MIXED); i++) {
Expand All @@ -679,10 +695,8 @@ void Executor<WorkspacePolicy, QueuePolicy>::PrepinData(
for (int j = 0; j < node.spec.NumInput(); ++j) {
auto tid = node.parent_tensors[j];
// Use pinned memory only when it is useful
auto &parent_tensor_queue =
get_queue<OpType::CPU, StorageDevice::CPU>(tensor_to_store_queue_[tid]);
for (auto &tensor : parent_tensor_queue) {
tensor->set_pinned(node.spec.OutputDevice(0) == "gpu" && !RestrictPinnedMemUsage());
if (node.spec.OutputDevice(0) == "gpu" && !RestrictPinnedMemUsage()) {
pin_cpu_passthrough(tensor_to_store_queue, graph, tid);
}
}
}
Expand All @@ -693,10 +707,8 @@ void Executor<WorkspacePolicy, QueuePolicy>::PrepinData(
for (int j = 0; j < node.spec.NumInput(); ++j) {
auto tid = node.parent_tensors[j];
if (graph.Tensor(tid).producer.storage_device == StorageDevice::CPU) {
auto &parent_tensor_queue =
get_queue<OpType::CPU, StorageDevice::CPU>(tensor_to_store_queue_[tid]);
for (auto &tensor : parent_tensor_queue) {
tensor->set_pinned(node.spec.OutputDevice(0) == "gpu" && !RestrictPinnedMemUsage());
if (node.spec.OutputDevice(0) == "gpu" && !RestrictPinnedMemUsage()) {
pin_cpu_passthrough(tensor_to_store_queue, graph, tid);
}
}
}
Expand Down
102 changes: 102 additions & 0 deletions dali/pipeline/executor/executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <chrono>
#include <future>

#include "dali/core/tensor_shape.h"
#include "dali/pipeline/data/backend.h"
#include "dali/test/dali_test_decoder.h"
#include "dali/pipeline/executor/executor.h"
#include "dali/pipeline/executor/pipelined_executor.h"
Expand Down Expand Up @@ -603,4 +605,104 @@ TYPED_TEST(ExecutorSyncTest, TestPrefetchedExecution) {
test::CheckResults(ws, batch_size, 1, tl);
}


TYPED_TEST(ExecutorTest, TestPinning) {
auto exe = this->GetExecutor(this->batch_size_, this->num_threads_, 0, 1);
exe->Init();

// Build a basic cpu->gpu graph
OpGraph graph;
graph.AddOp(this->PrepareSpec(OpSpec("ExternalSource")
.AddArg("device", "cpu")
.AddArg("device_id", 0)
.AddOutput("data_0", "cpu")),
"ExternalSource_0");

// First set of Copy + Copy and Pass Through
graph.AddOp(this->PrepareSpec(OpSpec("Copy")
.AddArg("device", "cpu")
.AddInput("data_0", "cpu")
.AddOutput("copy_0", "cpu")),
"Copy_0");

graph.AddOp(this->PrepareSpec(OpSpec("Copy")
.AddArg("device", "cpu")
.AddInput("data_0", "cpu")
.AddOutput("copy_1", "cpu")),
"Copy_1");

graph.AddOp(this->PrepareSpec(OpSpec("Reshape")
.AddArg("device", "cpu")
.AddArg("layout", "")
.AddInput("copy_0", "cpu")
.AddOutput("pass_through_0", "cpu")),
"PassThrough_0");

// Trigger pinning of first set when it moves CPU -> GPU
graph.AddOp(this->PrepareSpec(OpSpec("MakeContiguous")
.AddArg("device", "mixed")
.AddInput("pass_through_0", "cpu")
.AddOutput("out_0", "gpu")),
"MakeContiguous_0");

// but not the Copy_1 to compare against
graph.AddOp(this->PrepareSpec(OpSpec("MakeContiguous")
.AddArg("device", "mixed")
.AddInput("copy_1", "cpu")
.AddOutput("out_1", "cpu")),
"MakeContiguous_1");


// Second set of Copy and Pass Through
graph.AddOp(this->PrepareSpec(OpSpec("Copy")
.AddArg("device", "cpu")
.AddInput("data_0", "cpu")
.AddOutput("copy_2", "cpu")),
"Copy_2");

graph.AddOp(this->PrepareSpec(OpSpec("Reshape")
.AddArg("device", "cpu")
.AddArg("layout", "")
.AddInput("copy_2", "cpu")
.AddOutput("pass_through_1", "cpu")),
"PassThrough_1");

// Check pinning argument inputs to operators in GPU stage
graph.AddOp(this->PrepareSpec(OpSpec("random__CoinFlip")
.AddArg("device", "gpu")
.AddArgumentInput("probability", "pass_through_1")
.AddOutput("out_2", "gpu")),
"CoinFlip");

vector<string> outputs = {"copy_0_cpu", "copy_1_cpu", "pass_through_0_cpu", "copy_2_cpu",
"pass_through_1_cpu", "out_0_gpu", "out_1_cpu", "out_2_gpu"};

exe->Build(&graph, outputs);

// Set the data for the external source
auto *src_op = dynamic_cast<ExternalSource<CPUBackend> *>(graph.Node(OpType::CPU, 0).op.get());
TensorList<CPUBackend> tl;
tl.Resize(uniform_list_shape(this->batch_size_, TensorShape<>{}), DALI_FLOAT);
src_op->SetDataSource(tl);

exe->RunCPU();
exe->RunMixed();
exe->RunGPU();

DeviceWorkspace ws;
exe->Outputs(&ws);

// Utilize the fact that the outputs are shared from the executor, so we can check if they are
// pinned in a way we expect
// Currently we expect to pin anything that is CPU argument input into GPU operator, and
// is a CPU -> GPU copy (not via a decoder), so CPU input to Mixed operator that returns GPU data.
// The whole pass-through group should be pinned as well.

EXPECT_TRUE(ws.Output<CPUBackend>(0).is_pinned()); // copy_0_cpu
EXPECT_FALSE(ws.Output<CPUBackend>(1).is_pinned()); // copy_1_cpu
EXPECT_TRUE(ws.Output<CPUBackend>(2).is_pinned()); // pass_through_0_cpu
EXPECT_TRUE(ws.Output<CPUBackend>(3).is_pinned()); // copy_2_cpu
EXPECT_TRUE(ws.Output<CPUBackend>(4).is_pinned()); // pass_through_1_cpu
}

} // namespace dali
82 changes: 48 additions & 34 deletions dali/pipeline/graph/graph_descr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,45 +580,18 @@ bool OpGraph::IsAlwaysContiguous(TensorNodeId tensor_id) const {

std::vector<TensorNodeId> OpGraph::GetOutputs(const std::vector<string>& output_names,
bool follow_pass_through) const {
std::vector<TensorNodeId> result;
std::vector<TensorNodeId> output_ids;
for (const auto& out : output_names) {
output_ids.push_back(TensorId(out));
}
if (!follow_pass_through) {
for (const auto& out : output_names) {
result.push_back(TensorId(out));
}
} else {
std::vector<bool> visited(tensor_nodes_.size());
std::vector<TensorNodeId> q;
for (const auto& out : output_names) {
q.push_back(TensorId(out));
}
while (!q.empty()) {
TensorNodeId tid = q.back();
q.pop_back();
if (visited[tid])
continue;
visited[tid] = true;
result.push_back(tid);
auto output = Tensor(tid).producer;
auto &output_op_node = Node(output.node);
auto &schema = output_op_node.spec.GetSchema();
// Special PassThrough handling for built-in operator. We calculate it via earlier pass.
if (schema.name() == "MakeContiguous") {
if (IsPassThrough(*output_op_node.op)) {
assert(output_op_node.parent_tensors.size() == 1);
q.push_back(output_op_node.parent_tensors[0]);
}
}
auto maybe_source = FollowPassThroughUp(output.node, tid);
if (maybe_source >= 0) {
q.push_back(maybe_source);
}
}
return output_ids;
}
return result;
return GetPassThroughGroupImpl(output_ids);
}


void OpGraph::SetupMakeContiguousPassThrough(const std::vector<string>& output_names) {
void OpGraph::SetupMakeContiguousPassThrough() {
// Detect the pass through for all MakeContiguous ops
for (int i = 0; i < NumOp(); i++) {
auto &node = Node(i);
Expand All @@ -632,8 +605,49 @@ void OpGraph::SetupMakeContiguousPassThrough(const std::vector<string>& output_n
}
}
}
pass_through_computed_ = true;
}


std::vector<TensorNodeId> OpGraph::GetTensorOrigin(TensorNodeId target_node) const {
return GetPassThroughGroupImpl({target_node});
}


std::vector<TensorNodeId> OpGraph::GetPassThroughGroupImpl(
const std::vector<TensorNodeId> &target_nodes) const {
DALI_ENFORCE(pass_through_computed_, "SetupMakeContiguousPassThrough must be called first.");
std::vector<bool> visited(tensor_nodes_.size());
std::vector<TensorNodeId> q;
q.insert(q.end(), target_nodes.begin(), target_nodes.end());
std::vector<TensorNodeId> result;

while (!q.empty()) {
TensorNodeId tid = q.back();
q.pop_back();
if (visited[tid])
continue;
visited[tid] = true;
result.push_back(tid);
auto producer_edge = Tensor(tid).producer;
auto &producer_op_node = Node(producer_edge.node);
auto &schema = producer_op_node.spec.GetSchema();
// Special PassThrough handling for built-in operator. We calculate it via earlier pass.
if (schema.name() == "MakeContiguous") {
if (IsPassThrough(*producer_op_node.op)) {
assert(producer_op_node.parent_tensors.size() == 1);
q.push_back(producer_op_node.parent_tensors[0]);
}
}
auto maybe_source = FollowPassThroughUp(producer_edge.node, tid);
if (maybe_source >= 0) {
q.push_back(maybe_source);
}
}
return result;
}


TensorNodeId OpGraph::FollowPassThroughUp(OpNodeId op, TensorNodeId passed_through) const {
// TODO(klecki): Use std::optional instead of returning -1. op_graph is transitively included
// in some .cu compilation units, so we cannot use it yet.
Expand Down
30 changes: 29 additions & 1 deletion dali/pipeline/graph/op_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,21 @@ class DLL_PUBLIC OpGraph {
* We need to calculate it ahead of time to allow for correct allocation of prefetch queues
* (the PassThrough information is static).
*/
DLL_PUBLIC void SetupMakeContiguousPassThrough(const std::vector<string>& output_names);
DLL_PUBLIC void SetupMakeContiguousPassThrough();

/**
* @brief Return a TensorNodeId or a set of those, where the memory for the target_node
* actually originates or passes through.
*
* It will be either the same node (if the operator produces/allocates this node), or a set of
* nodes that the memory was passed through.
*
* Valid only after the information about PassThrough is stable, that is if
* SetupMakeContiguousPassThrough was called.
*
* @param target_node Node that we want to find origin for.
*/
DLL_PUBLIC std::vector<TensorNodeId> GetTensorOrigin(TensorNodeId target_node) const;

private:
// Should be called only once for each tensor
Expand All @@ -362,10 +376,22 @@ class DLL_PUBLIC OpGraph {
/**
* @brief Find the parent tensor id that was used to produce the `passed_through` tensor by
* the op.
* This is just one step up the graph.
* @param op Id of op node possibly doing the pass through
* @param passed_through Id of the tensor node - we search for its parents in pass-through
* relation
* @return -1 is returned if no node can be found.
*/
TensorNodeId FollowPassThroughUp(OpNodeId op, TensorNodeId passed_through) const;

/**
* @brief Follow up the graph via the pass through relation of the target nodes
*
* @param target_nodes group of nodes that we start from the search (upwards).
*/
std::vector<TensorNodeId> GetPassThroughGroupImpl(
const std::vector<TensorNodeId>& target_nodes) const;

/**
* @brief Recalculate OpNodes partitioning
*
Expand Down Expand Up @@ -430,6 +456,8 @@ class DLL_PUBLIC OpGraph {
void RemoveOpNode(OpNodeId id);

std::map<std::string, TensorNodeId> tensor_name_to_id_;

bool pass_through_computed_ = false;
};

} // namespace dali
Expand Down
Loading