Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid full construction of the pipeline during construction and fix seed support in serialized pipelines #16

Merged
merged 12 commits into from
Jun 29, 2018
Merged
14 changes: 0 additions & 14 deletions dali/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ void daliCreatePipeline(daliPipelineHandle* pipe_handle,
batch_size,
num_threads,
device_id,
-1,
true,
true);
pipe->Build();
Expand All @@ -51,19 +50,6 @@ void daliOutput(daliPipelineHandle* pipe_handle) {
pipeline->Outputs(ws);
}

void* daliTensorAt(daliPipelineHandle* pipe_handle, int n) {
dali::DeviceWorkspace* ws = reinterpret_cast<dali::DeviceWorkspace*>(pipe_handle->ws);
if (ws->OutputIsType<dali::CPUBackend>(n)) {
dali::Tensor<dali::CPUBackend> *t = new dali::Tensor<dali::CPUBackend>();
t->ShareData(ws->Output<dali::CPUBackend>(n));
return t;
} else {
dali::Tensor<dali::GPUBackend> *t = new dali::Tensor<dali::GPUBackend>();
t->ShareData(ws->Output<dali::GPUBackend>(n));
return t;
}
}

int64_t* daliShapeAt(daliPipelineHandle* pipe_handle, int n) {
dali::DeviceWorkspace* ws = reinterpret_cast<dali::DeviceWorkspace*>(pipe_handle->ws);
int64_t* c_shape = nullptr;
Expand Down
36 changes: 32 additions & 4 deletions dali/c_api/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,45 @@ extern "C" {
void* ws;
};

/**
* @brief Create DALI pipeline. Setting batch_size,
* num_threads or device_id here overrides
* values stored in the serialized pipeline.
*/
DLL_PUBLIC void daliCreatePipeline(daliPipelineHandle* pipe_handle,
const char *serialized_pipeline,
int length,
int batch_size,
int num_threads,
int device_id);
int batch_size = -1,
int num_threads = -1,
int device_id = -1);

/**
* @brief Start the execution of the pipeline.
*/
DLL_PUBLIC void daliRun(daliPipelineHandle* pipe_handle);

/**
* @brief Wait till the output of the pipeline is ready.
*/
DLL_PUBLIC void daliOutput(daliPipelineHandle* pipe_handle);
DLL_PUBLIC void* daliTensorAt(daliPipelineHandle* pipe_handle, int n);

/**
* @brief Return the shape of the output tensor
* stored at position `n` in the pipeline.
* This function may only be called after
* calling Output function.
*/
DLL_PUBLIC int64_t* daliShapeAt(daliPipelineHandle* pipe_handle, int n);

/**
* @brief Copy the output tensor stored
* at position `n` in the pipeline.
*/
DLL_PUBLIC void daliCopyTensorNTo(daliPipelineHandle* pipe_handle, void* dst, int n);

/**
* @brief Delete the pipeline object.
*/
DLL_PUBLIC void daliDeletePipeline(daliPipelineHandle* pipe_handle);
}

Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/op_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void OpGraph::AddOp(const OpSpec &spec, const std::string& name) {
DALI_ENFORCE(ret.second, "Operator '" + spec.name() +
"' has output with name " + name + ", but output "
"with this name already exists as output of op '" +
this->node(TensorSourceID(name)).spec.name());
this->node(TensorSourceID(name)).spec.name() + "'");
}
}

Expand Down
89 changes: 69 additions & 20 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ void Pipeline::AddOperator(OpSpec spec, const std::string& inst_name) {
DALI_ENFORCE(!built_, "Alterations to the pipeline after "
"\"Build()\" has been called are not allowed");

// Take a copy of the passed OpSpec for serialization purposes
this->op_specs_.push_back(make_pair(inst_name, spec));

// Validate op device
string device = spec.GetArgument<string>("device");
DALI_ENFORCE(device == "cpu" ||
Expand Down Expand Up @@ -148,18 +145,46 @@ void Pipeline::AddOperator(OpSpec spec, const std::string& inst_name) {
"Output name insertion failure.");
}

// Add the operator to the graph
PrepareOpSpec(&spec);
graph_.AddOp(spec, inst_name);
// Take a copy of the passed OpSpec for serialization purposes
this->op_specs_.push_back(make_pair(inst_name, spec));
this->op_specs_to_serialize_.push_back(true);
}

void Pipeline::Build(vector<std::pair<string, string>> output_names) {
DeviceGuard g(device_id_);

output_names_ = output_names;
DALI_ENFORCE(!built_, "\"Build()\" can only be called once.");
DALI_ENFORCE(output_names.size() > 0, "User specified zero outputs.");


// Creating the executor
if (pipelined_execution_ && async_execution_) {
executor_.reset(new AsyncPipelinedExecutor(
batch_size_, num_threads_,
device_id_, bytes_per_sample_hint_,
set_affinity_, max_num_stream_));
executor_->Init();
} else if (pipelined_execution_) {
executor_.reset(new PipelinedExecutor(
batch_size_, num_threads_,
device_id_, bytes_per_sample_hint_,
set_affinity_, max_num_stream_));
} else if (async_execution_) {
DALI_FAIL("Not implemented.");
} else {
executor_.reset(new Executor(
batch_size_, num_threads_,
device_id_, bytes_per_sample_hint_,
set_affinity_, max_num_stream_));
}

// Creating the graph
for (auto& name_op_spec : op_specs_) {
string& inst_name = name_op_spec.first;
OpSpec op_spec = name_op_spec.second;
PrepareOpSpec(&op_spec);
graph_.AddOp(op_spec, inst_name);
}

// Validate the output tensors names
vector<string> outputs;
for (const auto &name_pair : output_names) {
Expand Down Expand Up @@ -207,11 +232,17 @@ void Pipeline::Build(vector<std::pair<string, string>> output_names) {
}
}

DeviceGuard d(device_id_);
// Load the final graph into the executor
executor_->Build(&graph_, outputs);
built_ = true;
}

void Pipeline::SetOutputNames(vector<std::pair<string, string>> output_names) {
output_names_ = output_names;
}


void Pipeline::RunCPU() {
DALI_ENFORCE(built_,
"\"Build()\" must be called prior to executing the pipeline.");
Expand Down Expand Up @@ -242,14 +273,21 @@ void Pipeline::Outputs(DeviceWorkspace *ws) {
void Pipeline::SetupCPUInput(std::map<string, EdgeMeta>::iterator it,
int input_idx, OpSpec *spec) {
if (!it->second.has_contiguous) {
if (graph_.TensorExists(OpSpec::TensorName("contiguous_" + it->first, "cpu"))) return;
// We check if the make contiguous op already exists
std::string op_name = "__MakeContiguous_" + it->first;
if (std::find_if(op_specs_.begin(), op_specs_.end(),
[&op_name] (const std::pair<string, OpSpec>& p) {
return p.first == op_name;}) != op_specs_.end()) {
return;
}

OpSpec make_contiguous_spec =
OpSpec("MakeContiguous")
.AddArg("device", "mixed")
.AddInput(it->first, "cpu")
.AddOutput("contiguous_" + it->first, "cpu");
PrepareOpSpec(&make_contiguous_spec);
graph_.AddOp(make_contiguous_spec, "__MakeContiguous_" + it->first);
this->op_specs_.push_back(make_pair("__MakeContiguous_" + it->first, make_contiguous_spec));
this->op_specs_to_serialize_.push_back(false);
}

// Update the OpSpec to use the contiguous input
Expand All @@ -262,14 +300,21 @@ void Pipeline::SetupCPUInput(std::map<string, EdgeMeta>::iterator it,

void Pipeline::SetupGPUInput(std::map<string, EdgeMeta>::iterator it) {
if (it->second.has_gpu) return;
if (graph_.TensorExists(OpSpec::TensorName(it->first, "gpu"))) return;
// We check if the copy_to_dev op already exists
std::string op_name = "__Copy_" + it->first;
if (std::find_if(op_specs_.begin(), op_specs_.end(),
[&op_name] (const std::pair<string, OpSpec>& p) {
return p.first == op_name;}) != op_specs_.end()) {
return;
}

OpSpec copy_to_dev_spec =
OpSpec("MakeContiguous")
.AddArg("device", "mixed")
.AddInput(it->first, "cpu")
.AddOutput(it->first, "gpu");
PrepareOpSpec(&copy_to_dev_spec);
graph_.AddOp(copy_to_dev_spec, "__Copy_" + it->first);
this->op_specs_.push_back(make_pair("__Copy_" + it->first, copy_to_dev_spec));
this->op_specs_to_serialize_.push_back(false);
}

void Pipeline::PrepareOpSpec(OpSpec *spec) {
Expand All @@ -285,6 +330,8 @@ string Pipeline::SerializeToProtobuf() const {
dali_proto::PipelineDef pipe;
pipe.set_num_threads(this->num_threads());
pipe.set_batch_size(this->batch_size());
pipe.set_device_id(this->device_id());
pipe.set_seed(this->original_seed_);

// loop over external inputs
for (auto &name : external_inputs_) {
Expand All @@ -293,14 +340,16 @@ string Pipeline::SerializeToProtobuf() const {

// loop over ops, create messages and append
for (size_t i = 0; i < this->op_specs_.size(); ++i) {
dali_proto::OpDef *op_def = pipe.add_op();
if (op_specs_to_serialize_[i]) {
dali_proto::OpDef *op_def = pipe.add_op();

const auto& p = this->op_specs_[i];
const OpSpec& spec = p.second;
const auto& p = this->op_specs_[i];
const OpSpec& spec = p.second;

// As long as spec isn't an ExternalSource node, serialize
if (spec.name() != "ExternalSource") {
spec.SerializeToProtobuf(op_def, p.first);
// As long as spec isn't an ExternalSource node, serialize
if (spec.name() != "ExternalSource") {
spec.SerializeToProtobuf(op_def, p.first);
}
}
}

Expand Down
Loading