Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ready event to Tensor and TensorList. #5673

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion dali/core/cuda_event_pool_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2020, 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
#include "dali/core/cuda_error.h"
#include "dali/core/cuda_event_pool.h"
#include "dali/core/cuda_stream.h"
#include "dali/core/cuda_shared_event.h"

namespace dali {
namespace test {
Expand Down Expand Up @@ -58,5 +59,53 @@ TEST(EventPoolTest, PutGet) {
t.join();
}

TEST(CUDASharedEventTest, RefCounting) {
int devices = 0;
(void)cudaGetDeviceCount(&devices);
if (devices == 0) {
(void)cudaGetLastError(); // No CUDA devices - we don't care about the error
GTEST_SKIP();
}

CUDASharedEvent ev1 = CUDASharedEvent::GetFromPool();
CUDASharedEvent ev2 = CUDASharedEvent::GetFromPool();
ASSERT_EQ(ev1, ev1.get()) << "Sanity check failed - object not equal to itself.";
ASSERT_NE(ev1.get(), nullptr) << "Sanity check failed - returned null instead of throwing.";
ASSERT_NE(ev2.get(), nullptr) << "Sanity check failed - returned null instead of throwing.";
ASSERT_NE(ev1, nullptr) << "Sanity check failed - comparison to null broken.";
ASSERT_NE(ev1, ev2) << "Sanity check failed - returned the same object twice.";

EXPECT_EQ(ev1.use_count(), 1);
EXPECT_EQ(ev2.use_count(), 1);
CUDASharedEvent ev3 = ev1;
EXPECT_EQ(ev1, ev3);
EXPECT_EQ(ev1.use_count(), 2);
EXPECT_EQ(ev3.use_count(), 2);
ev1.reset();
EXPECT_EQ(ev1.use_count(), 0);
EXPECT_EQ(ev3.use_count(), 1);
}

TEST(CUDASharedEventTest, ReturnToPool) {
int devices = 0;
(void)cudaGetDeviceCount(&devices);
if (devices == 0) {
(void)cudaGetLastError(); // No CUDA devices - we don't care about the error
GTEST_SKIP();
}

CUDAEventPool pool;

CUDASharedEvent ev1 = CUDASharedEvent::GetFromPool(pool);
EXPECT_NE(ev1, nullptr);
cudaEvent_t orig = ev1.get();
ev1.reset();
EXPECT_EQ(ev1, nullptr);
CUDASharedEvent ev2 = CUDASharedEvent::GetFromPool(pool);
EXPECT_EQ(ev2.get(), orig) << "Should have got the sole event from the pool";
ev1 = CUDASharedEvent::GetFromPool(pool);
EXPECT_NE(ev1, ev2);
}

} // namespace test
} // namespace dali
32 changes: 26 additions & 6 deletions dali/pipeline/data/tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@

#include "dali/core/common.h"
#include "dali/core/error_handling.h"
#include "dali/core/util.h"
#include "dali/core/cuda_shared_event.h"
#include "dali/core/span.h"
#include "dali/core/traits.h"
#include "dali/core/tensor_shape.h"
#include "dali/core/util.h"
#include "dali/pipeline/data/backend.h"
#include "dali/pipeline/data/buffer.h"
#include "dali/pipeline/data/meta.h"
Expand Down Expand Up @@ -208,6 +209,7 @@ class Tensor : public Buffer<Backend> {
// Copy the tensor's meta-data
shape_ = t.shape_;
meta_ = t.meta_;
ready_ = t.ready_;
}

/**
Expand All @@ -227,7 +229,7 @@ class Tensor : public Buffer<Backend> {
*/
inline void ShareData(shared_ptr<void> ptr, size_t bytes, bool pinned,
const TensorShape<> &shape, DALIDataType type, int device_id,
AccessOrder order = {}) {
AccessOrder order = {}, CUDASharedEvent ready = {}) {
Index new_size = volume(shape);
DALI_ENFORCE(new_size == 0 || type != DALI_NO_TYPE,
"Only empty tensors can be shared without specifying a type.");
Expand All @@ -246,6 +248,7 @@ class Tensor : public Buffer<Backend> {
size_ = new_size;
num_bytes_ = bytes;
device_ = device_id;
ready_ = std::move(ready);

// If the input pointer stores a non-zero size allocation, mark
// that we are sharing our underlying data
Expand Down Expand Up @@ -276,8 +279,10 @@ class Tensor : public Buffer<Backend> {
* the dependency on the work that is happening on another device.
*/
inline void ShareData(void *ptr, size_t bytes, bool pinned, const TensorShape<> &shape,
DALIDataType type, int device_id, AccessOrder order = {}) {
ShareData(shared_ptr<void>(ptr, [](void *) {}), bytes, pinned, shape, type, device_id, order);
DALIDataType type, int device_id, AccessOrder order = {},
CUDASharedEvent ready = {}) {
ShareData(shared_ptr<void>(ptr, [](void *) {}), bytes, pinned, shape, type,
device_id, order, std::move(ready));
}

/**
Expand All @@ -302,8 +307,8 @@ class Tensor : public Buffer<Backend> {
* the dependency on the work that is happening on another device.
*/
inline void ShareData(void *ptr, size_t bytes, bool pinned, DALIDataType type, int device_id,
AccessOrder order = {}) {
ShareData(ptr, bytes, pinned, { 0 }, type, device_id, order);
AccessOrder order = {}, CUDASharedEvent ready = {}) {
ShareData(ptr, bytes, pinned, { 0 }, type, device_id, order, std::move(ready));
}

inline void Reset(AccessOrder order = {}) {
Expand Down Expand Up @@ -449,9 +454,24 @@ class Tensor : public Buffer<Backend> {
return meta_.ShouldSkipSample();
}

/** Returns an optional, shared handle to CUDA event that marks the readiness of the tensor data.
*
* This ready event may be shared by multiple tensor lists or tensors. It may also be null.
* Typical DALI operators don't need to record or wait for this event.
*/
const CUDASharedEvent &ready_event() const {
return ready_;
}

/** Sets the shared event handle that marks the readiness of the tensor data. */
void set_ready_event(CUDASharedEvent ready) {
ready_ = std::move(ready);
}

protected:
TensorShape<> shape_ = { 0 };
DALIMeta meta_;
CUDASharedEvent ready_;
USE_BUFFER_MEMBERS();

// So TensorList can access data_ of the tensor directly
Expand Down
23 changes: 10 additions & 13 deletions dali/pipeline/data/tensor_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ template <typename Backend>
TensorList<Backend> &TensorList<Backend>::operator=(TensorList<Backend> &&other) noexcept {
if (&other != this) {
contiguous_buffer_ = std::move(other.contiguous_buffer_);
buffer_bkp_ = std::move(other.buffer_bkp_);
tensors_ = std::move(other.tensors_);

state_ = other.state_;
Expand Down Expand Up @@ -759,25 +758,23 @@ void TensorList<Backend>::MakeNoncontiguous() {

template <typename Backend>
void TensorList<Backend>::DoMakeNoncontiguous() {
// We clear the contiguous_buffer_, as we are now non-contiguous.
buffer_bkp_ = contiguous_buffer_.get_data_ptr();
contiguous_buffer_.reset();
auto &contiguous_ptr = contiguous_buffer_.get_data_ptr();
for (auto &t : tensors_) {
// If the Tensor was aliasing the contiguous buffer, mark it as not sharing any data.
// This will allow for the individual buffers to be resized.
// The downside of this is we may keep the big contiguous buffer until all individual
// samples are replaced.
if (same_managed_object(buffer_bkp_, t.data_)) {
if (same_managed_object(contiguous_ptr, t.data_)) {
t.detach();
}
}
contiguous_buffer_.reset();
}


template <typename Backend>
void TensorList<Backend>::Reset() {
contiguous_buffer_.reset();
buffer_bkp_.reset();
// TODO(klecki): Is there any benefit to call Reset on all?
tensors_.clear();

Expand All @@ -786,8 +783,8 @@ void TensorList<Backend>::Reset() {
sample_dim_ = -1;
shape_ = {};
layout_ = "";
// N.B. state_, pinned_, order_ and device_ are not reset here, as they might be previously set
// up via the executor - TODO(klecki) - consider if we want to keep this behaviour
// N.B. state_, pinned_, order_, device_ and ready_ are not reset here, as they might be
// previously set up via the executor - TODO(klecki) - consider if we want to keep this behaviour
}


Expand Down Expand Up @@ -857,8 +854,6 @@ void TensorList<Backend>::ShareData(const TensorList<Backend> &tl) {
if (!same_data)
Reset();

buffer_bkp_.reset(); // TODO(michalz): perhaps we should copy it from the source, too?

state_ = tl.state_;
curr_num_tensors_ = tl.curr_num_tensors_;
type_ = tl.type_;
Expand All @@ -868,6 +863,7 @@ void TensorList<Backend>::ShareData(const TensorList<Backend> &tl) {
pinned_ = tl.pinned_;
order_ = tl.order_;
device_ = tl.device_;
ready_ = tl.ready_;

if (tl.IsContiguous()) {
if (!same_data)
Expand Down Expand Up @@ -946,7 +942,7 @@ Tensor<Backend> TensorList<Backend>::AsReshapedTensor(const TensorShape<> &new_s
}

result.ShareData(std::move(ptr), capacity(), is_pinned(),
new_shape, type(), device_id(), order());
new_shape, type(), device_id(), order(), ready_);

auto result_layout = GetLayout();
if (result_layout.ndim() + 1 == new_shape.sample_dim()) {
Expand Down Expand Up @@ -974,11 +970,11 @@ Tensor<Backend> TensorList<Backend>::AsTensor() {
template <typename Backend>
void TensorList<Backend>::ShareData(shared_ptr<void> ptr, size_t bytes, bool pinned,
const TensorListShape<> &shape, DALIDataType type,
int device_id, AccessOrder order, const TensorLayout &layout) {
int device_id, AccessOrder order, const TensorLayout &layout,
CUDASharedEvent ready) {
contiguous_buffer_.set_backing_allocation(std::move(ptr), bytes, pinned,
type, shape.num_elements(),
device_id, order);
buffer_bkp_.reset();
tensors_.clear();
tensors_.resize(shape.num_samples());

Expand All @@ -990,6 +986,7 @@ void TensorList<Backend>::ShareData(shared_ptr<void> ptr, size_t bytes, bool pin
layout_ = layout;
pinned_ = pinned;
device_ = device_id;
ready_ = ready;
if (order)
order_ = order;
recreate_views();
Expand Down
29 changes: 23 additions & 6 deletions dali/pipeline/data/tensor_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ class DLL_PUBLIC TensorList {
*/
DLL_PUBLIC void ShareData(shared_ptr<void> ptr, size_t bytes, bool pinned,
const TensorListShape<> &shape, DALIDataType type, int device_id,
AccessOrder order = {}, const TensorLayout &layout = "");
AccessOrder order = {}, const TensorLayout &layout = "",
CUDASharedEvent ready = {});

/**
* @brief Set other batch as backing memory for this one. Preserves the contiguity status.
Expand Down Expand Up @@ -587,6 +588,22 @@ class DLL_PUBLIC TensorList {
*/
size_t capacity() const noexcept;

/** Returns an optional, shared handle to CUDA event that marks the readiness of the data.
*
* This ready event may be shared by multiple tensor lists or tensors. It may also be null.
* Typical DALI operators don't need to record or wait for this event.
*/
const CUDASharedEvent &ready_event() const {
return ready_;
}

/** Sets the shared event handle that marks the readiness of the data. */
void set_ready_event(CUDASharedEvent ready) {
ready_ = std::move(ready);
}



/**
* @brief Returns the size in bytes of the underlying data chunks
* TODO(klecki): Temporary API to be reworked, do not use.
Expand Down Expand Up @@ -765,7 +782,6 @@ class DLL_PUBLIC TensorList {

// Memory backing
Buffer<Backend> contiguous_buffer_;
std::weak_ptr<void> buffer_bkp_;
// Memory, sample aliases and metadata
// TODO(klecki): Remove SampleWorkspace (only place where we actually need those Tensor objects)
// and swap to plain Buffer instead of using actual Tensors.
Expand All @@ -774,15 +790,16 @@ class DLL_PUBLIC TensorList {
// State and metadata that should be uniform regardless of the contiguity state.
// Sample aliases should match the information stored below.
State state_;
int curr_num_tensors_;
TypeInfo type_{};
bool pinned_ = true;
int curr_num_tensors_ = 0;
int sample_dim_ = -1;
int device_ = CPU_ONLY_DEVICE_ID;
TypeInfo type_{};
TensorListShape<> shape_;
TensorLayout layout_;

bool pinned_ = true;
int device_ = CPU_ONLY_DEVICE_ID;
AccessOrder order_ = AccessOrder::host();
CUDASharedEvent ready_;

// So we can access the members of other TensorLists
// with different template types
Expand Down
10 changes: 8 additions & 2 deletions dali/pipeline/data/tensor_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1994,12 +1994,18 @@ TEST_F(TensorListVariableBatchSizeTest, UpdatePropertiesFromSamples) {
}

TEST(TensorList, ResizeOverheadPerf) {
cudaFree(0);
(void)cudaFree(0);
#ifdef DALI_DEBUG
int niter = 2000;
int warmup = 500;
#else
int niter = 20000;
int warmup = 5000;
#endif
int total_size = 256 << 10;
int nsamples = 1024;
auto shape = uniform_list_shape(nsamples, {total_size / nsamples});
for (int i = 0; i < 5000; i++) {
for (int i = 0; i < warmup; i++) {
TensorList<CPUBackend> tl;
tl.set_pinned(false);
tl.Resize(shape, DALI_UINT8);
Expand Down
4 changes: 2 additions & 2 deletions dali/pipeline/executor/executor2/exec_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ std::unique_ptr<Workspace> ExecNode::CreateOpWorkspace() {
return ws;
}

std::pair<std::unique_ptr<Workspace>, SharedEventLease>
std::pair<std::unique_ptr<Workspace>, CUDASharedEvent>
ExecNode::GetWorkspace(WorkspaceParams params) {
if (!ws_) {
assert(!has_workspace_);
Expand All @@ -160,7 +160,7 @@ ExecNode::GetWorkspace(WorkspaceParams params) {
ApplyWorkspaceParams(*ws_, params);

if (ws_->output_order().is_device())
ws_event_ = SharedEventLease::Get();
ws_event_ = CUDASharedEvent::GetFromPool();
else
ws_event_.reset();
ws_->set_event(ws_event_);
Expand Down
10 changes: 5 additions & 5 deletions dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <unordered_map>
#include <vector>

#include "dali/pipeline/executor/executor2/shared_event_lease.h"
#include "dali/core/cuda_shared_event.h"
#include "dali/pipeline/operator/operator.h"
#include "dali/pipeline/workspace/workspace.h"

Expand Down Expand Up @@ -62,13 +62,13 @@ struct PipelineOutput {
PipelineOutput(const PipelineOutput &) {
throw std::logic_error("This object is not copyable, but std::any needs it at compile time.");
}
PipelineOutput(const Workspace &ws, SharedEventLease event, std::optional<int> device)
PipelineOutput(const Workspace &ws, CUDASharedEvent event, std::optional<int> device)
: workspace(ws), event(std::move(event)), device(device) {}

/** The payload */
Workspace workspace;
/** Owns the event used by the workspace */
SharedEventLease event;
CUDASharedEvent event;
/** The ordinal of the device used by the workspace */
std::optional<int> device;
};
Expand Down Expand Up @@ -179,7 +179,7 @@ class DLL_PUBLIC ExecNode {
* Requesting multiple workspaces is a coding error.
* The workspace is updated with the WorkspaceParams supplied to this function.
*/
std::pair<std::unique_ptr<Workspace>, SharedEventLease> GetWorkspace(WorkspaceParams params);
std::pair<std::unique_ptr<Workspace>, CUDASharedEvent> GetWorkspace(WorkspaceParams params);

/** Puts the workspace back into the node for later reuse.
*
Expand Down Expand Up @@ -244,7 +244,7 @@ class DLL_PUBLIC ExecNode {
bool has_workspace_ = false;

/** The event associated with the workspace */
SharedEventLease ws_event_;
CUDASharedEvent ws_event_;

/** Moves to a new iteration. */
void NextIter() {
Expand Down
Loading
Loading