Skip to content

Commit

Permalink
Bugfix.
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
  • Loading branch information
mzient committed Aug 2, 2024
1 parent 8c6e9e1 commit 8e3385f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
14 changes: 6 additions & 8 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,12 @@ int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_
if (input_device == "gpu") {
SetupGPUInput(it);
} else {
if (device != "gpu") {
// device == gpu
// TODO(michalz): Add a D2H copy instead
DALI_ENFORCE(it->second.has_cpu,
make_string("Error while specifying ", FormatInput(spec, i),
". CPU input requested by operator exists only on GPU. CPU "
"operator cannot follow GPU operator."));
}
// device == gpu
// TODO(michalz): Add a D2H copy instead
DALI_ENFORCE(it->second.has_cpu,
make_string("Error while specifying ", FormatInput(spec, i),
". CPU input requested by operator exists only on GPU. CPU "
"operator cannot follow GPU operator."));
}
}

Expand Down
37 changes: 26 additions & 11 deletions dali/pipeline/pipeline_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class PipelineTest : public DALITest {

// Inputs must be know to the pipeline, i.e. ops
// must be added in a topological ordering.
EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
Expand All @@ -71,10 +71,25 @@ class PipelineTest : public DALITest {
.AddArg("device", "gpu")
.AddOutput("data", "gpu"));

// TODO(michalz): Remove this constraint and the tests. This should be a build-time error,
// with old executor, not a construction-time error.

// For dev1 = "cpu": Inputs to CPU ops must be on CPU,
// we do not auto-copy them from gpu to cpu.
// For dev1 = "gpu": CPU inputs to GPU ops must be on CPU,
// we will not copy them back to the host.
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
.AddInput("data", dev2)
.AddOutput("copy_out", dev1)),
std::runtime_error) << "backend = " << dev1 << " input = " << dev2 << " output = " << dev1;

if (dev1 == "cpu") {
// Inputs to CPU ops must already exist on CPU,
// we do not auto-copy them from gpu to cpu.
EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
Expand All @@ -94,7 +109,7 @@ class PipelineTest : public DALITest {
.AddOutput("data_3", dev1));

// Outputs must have unique names.
EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
Expand All @@ -110,7 +125,7 @@ class PipelineTest : public DALITest {
}
// All data must have unique names regardless
// of the device they exist on.
EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
Expand All @@ -120,7 +135,7 @@ class PipelineTest : public DALITest {


// CPU ops can only produce CPU outputs
EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", dev1)
Expand Down Expand Up @@ -217,7 +232,7 @@ TYPED_TEST_SUITE(PipelineTest, NumThreads);
TEST_F(PipelineTestOnce, TestInputNotKnown) {
Pipeline pipe(1, 1, 0);

EXPECT_THROW(
ASSERT_THROW(
pipe.AddOperator(
OpSpec("Copy")
.AddArg("device", "cpu")
Expand Down Expand Up @@ -546,7 +561,7 @@ class PrefetchedPipelineTest : public DALITest {
TEST_F(PrefetchedPipelineTest, SetQueueSizesSeparatedFail) {
Pipeline pipe(this->batch_size_, 4, 0);
// By default we are non-separated execution
EXPECT_THROW(pipe.SetQueueSizes(5, 3), std::runtime_error);
ASSERT_THROW(pipe.SetQueueSizes(5, 3), std::runtime_error);
}

TEST_F(PrefetchedPipelineTest, SetExecutionTypesFailAfterBuild) {
Expand All @@ -559,7 +574,7 @@ TEST_F(PrefetchedPipelineTest, SetExecutionTypesFailAfterBuild) {

vector<std::pair<string, string>> outputs = {{"final_images", "gpu"}};
pipe.Build(outputs);
EXPECT_THROW(pipe.SetExecutionTypes(), std::runtime_error);
ASSERT_THROW(pipe.SetExecutionTypes(), std::runtime_error);
}

TEST_F(PrefetchedPipelineTest, SetQueueSizesFailAfterBuild) {
Expand All @@ -572,7 +587,7 @@ TEST_F(PrefetchedPipelineTest, SetQueueSizesFailAfterBuild) {

vector<std::pair<string, string>> outputs = {{"final_images", "gpu"}};
pipe.Build(outputs);
EXPECT_THROW(pipe.SetQueueSizes(2, 2), std::runtime_error);
ASSERT_THROW(pipe.SetQueueSizes(2, 2), std::runtime_error);
}

TEST_F(PrefetchedPipelineTest, TestFillQueues) {
Expand Down Expand Up @@ -694,7 +709,7 @@ TEST(PipelineTest, AddOperator) {
.AddOutput("data_out1", "cpu"), "second_op", first_op);
EXPECT_EQ(first_op, second_op);

EXPECT_THROW(pipe.AddOperator(OpSpec("Copy"), "another_op", first_op), std::runtime_error);
ASSERT_THROW(pipe.AddOperator(OpSpec("Copy"), "another_op", first_op), std::runtime_error);

int third_op = pipe.AddOperator(OpSpec("DummyOpToAdd")
.AddArg("device", "cpu")
Expand All @@ -709,7 +724,7 @@ TEST(PipelineTest, AddOperator) {
.AddInput("data_in0", "cpu")
.AddOutput("data_out3", "cpu"), "DummyOpNoSync");

EXPECT_THROW(pipe.AddOperator(OpSpec("DummyOpNoSync")
ASSERT_THROW(pipe.AddOperator(OpSpec("DummyOpNoSync")
.AddArg("device", "cpu")
.AddInput("data_in0", "cpu")
.AddOutput("data_out4", "cpu"), "DummyOpNoSync2", disallow_sync_op), std::runtime_error);
Expand Down

0 comments on commit 8e3385f

Please sign in to comment.