Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36488: [C++] Import/Export ArrowDeviceArray #36489

Merged
merged 16 commits into from
Jul 25, 2023
15 changes: 13 additions & 2 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,25 @@ class ARROW_EXPORT Buffer {
///
/// \note The passed memory must be kept alive through some other means
Buffer(const uint8_t* data, int64_t size)
: is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size) {
: is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size), device_type_(DeviceType::CPU) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the is_cpu_ boolean? Is there a situation in which is_cpu_ is false and device_type_ != CPU?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is_cpu_ boolean is still useful because of pinned host memory. Both CUDA_HOST and ROCM_HOST are pinned CPU memory which would have the corresponding device type (since the memory is managed by the device drivers) but is accessible to the CPU.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the pointers are accessible to the CPU, there's no guarantee that there isn't stream ordered work happening on them where it could cause a race condition to CPU code accessing the buffer. Given is_cpu_ is existing ABI, I'm wondering if it wouldn't be a safer option to have it return False in these cases where if someone wants to do CPU things against CUDA_HOST or ROCM_HOST buffers they should explicitly check the device type instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think the intent for the is_cpu_ flag was solely to know whether or not the data is accessible via CPU rather than caring about the stream ordered work synchronizations. I intend on expanding the existing classes/objects to make it easier to synchronize for stream ordered work as a separate abstraction, so I lean towards leaving it marked as true in this case, but I'm open to changing it.

@pitrou @felipeblazing @Rhonda85 what do you all think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had not envisioned this issue, but I think your approach is correct: is_cpu() == true means that address() is a CPU-accessible pointer to the data. Whether or not the "expected" data is already there will depend on the producer.

And, yes, a generalized version of Buffer could have an optional sync event or something...

SetMemoryManager(default_cpu_memory_manager());
}

Buffer(const uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm,
std::shared_ptr<Buffer> parent = NULLPTR)
std::shared_ptr<Buffer> parent = NULLPTR, DeviceType device_type = DeviceType::UNKNOWN)
: is_mutable_(false),
data_(data),
size_(size),
capacity_(size),
parent_(std::move(parent)) {
// will set device_type from the memory manager
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
SetMemoryManager(std::move(mm));
// if a device type is specified, use that instead. for example:
// CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA,
// but you can specify CUDA_HOST as the device type to override it.
westonpace marked this conversation as resolved.
Show resolved Hide resolved
if (device_type != DeviceType::UNKNOWN) {
device_type_ = device_type;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a DCHECK be useful here to confirm that device_type_ ended-up being compatible with the memory manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, we'd have to add a Validate or Valid method to the memory manager to do so, to allow the memory manager to specify if the device_type is valid for it. Not sure how worthwhile that is though.

}

Buffer(uintptr_t address, int64_t size, std::shared_ptr<MemoryManager> mm,
Expand Down Expand Up @@ -240,6 +247,8 @@ class ARROW_EXPORT Buffer {

const std::shared_ptr<MemoryManager>& memory_manager() const { return memory_manager_; }

DeviceType device_type() const { return device_type_; }

std::shared_ptr<Buffer> parent() const { return parent_; }

/// \brief Get a RandomAccessFile for reading a buffer
Expand Down Expand Up @@ -294,6 +303,7 @@ class ARROW_EXPORT Buffer {
const uint8_t* data_;
int64_t size_;
int64_t capacity_;
DeviceType device_type_;

// null by default, but may be set
std::shared_ptr<Buffer> parent_;
Expand All @@ -309,6 +319,7 @@ class ARROW_EXPORT Buffer {
void SetMemoryManager(std::shared_ptr<MemoryManager> mm) {
memory_manager_ = std::move(mm);
is_cpu_ = memory_manager_->is_cpu();
device_type_ = memory_manager_->device()->device_type();
}

private:
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using internal::checked_cast;
using internal::checked_pointer_cast;

static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
static const DeviceType kMyDeviceType = DeviceType::EXT_DEV;

static const int kMyDeviceAllowCopy = 1;
static const int kMyDeviceAllowView = 2;
Expand Down Expand Up @@ -70,6 +71,8 @@ class MyDevice : public Device {
return checked_cast<const MyDevice&>(other).value_ == value_;
}

DeviceType device_type() const override { return kMyDeviceType; }

std::shared_ptr<MemoryManager> default_memory_manager() override;

int value() const { return value_; }
Expand Down Expand Up @@ -256,13 +259,15 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*cpu_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -290,13 +297,15 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*my_copy_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -330,6 +341,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -338,6 +350,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), my_view_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -348,6 +361,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceType::CPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand Down
Loading