diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 65f1abda16106..427c63d5eedf5 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -57,18 +58,31 @@ class ARROW_EXPORT Buffer { /// /// \note The passed memory must be kept alive through some other means Buffer(const uint8_t* data, int64_t size) - : is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size) { + : is_mutable_(false), + is_cpu_(true), + data_(data), + size_(size), + capacity_(size), + device_type_(DeviceAllocationType::kCPU) { SetMemoryManager(default_cpu_memory_manager()); } Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, - std::shared_ptr parent = NULLPTR) + std::shared_ptr parent = NULLPTR, + std::optional device_type = std::nullopt) : is_mutable_(false), data_(data), size_(size), capacity_(size), parent_(std::move(parent)) { + // SetMemoryManager will also set device_type_ SetMemoryManager(std::move(mm)); + // if a device type is specified, use that instead. for example: + // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA, + // but you can specify CUDA_HOST as the device type to override it. + if (device_type != std::nullopt) { + device_type_ = device_type; + } } Buffer(uintptr_t address, int64_t size, std::shared_ptr mm, @@ -240,6 +254,8 @@ class ARROW_EXPORT Buffer { const std::shared_ptr& memory_manager() const { return memory_manager_; } + std::optional device_type() const { return device_type_; } + std::shared_ptr parent() const { return parent_; } /// \brief Get a RandomAccessFile for reading a buffer @@ -294,6 +310,7 @@ class ARROW_EXPORT Buffer { const uint8_t* data_; int64_t size_; int64_t capacity_; + std::optional device_type_; // null by default, but may be set std::shared_ptr parent_; @@ -309,6 +326,7 @@ class ARROW_EXPORT Buffer { void SetMemoryManager(std::shared_ptr mm) { memory_manager_ = std::move(mm); is_cpu_ = memory_manager_->is_cpu(); + device_type_ = memory_manager_->device()->device_type(); } private: diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index ce8bab846d586..3dd95cb8af5c6 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -41,6 +41,7 @@ using internal::checked_cast; using internal::checked_pointer_cast; static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; +static const DeviceAllocationType kMyDeviceType = DeviceAllocationType::kEXT_DEV; static const int kMyDeviceAllowCopy = 1; static const int kMyDeviceAllowView = 2; @@ -70,6 +71,8 @@ class MyDevice : public Device { return checked_cast(other).value_ == value_; } + DeviceAllocationType device_type() const override { return kMyDeviceType; } + std::shared_ptr default_memory_manager() override; int value() const { return value_; } @@ -256,6 +259,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -263,6 +267,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -290,6 +297,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -297,6 +305,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -330,6 +341,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -338,6 +350,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), my_view_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -348,6 +361,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 85a5156d11db2..13355dd6d05ae 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -522,6 +522,8 @@ struct ExportedArrayPrivateData : PoolAllocationMixin std::shared_ptr data_; + RawSyncEvent sync_event_; + ExportedArrayPrivateData() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData); ARROW_DISALLOW_COPY_AND_ASSIGN(ExportedArrayPrivateData); @@ -544,7 +546,12 @@ void ReleaseExportedArray(struct ArrowArray* array) { << "Dictionary release callback should have marked it released"; } DCHECK_NE(array->private_data, nullptr); - delete reinterpret_cast(array->private_data); + auto* pdata = reinterpret_cast(array->private_data); + if (pdata->sync_event_.sync_event != nullptr && + pdata->sync_event_.release_func != nullptr) { + pdata->sync_event_.release_func(pdata->sync_event_.sync_event); + } + delete pdata; ArrowArrayMarkReleased(array); } @@ -584,6 +591,7 @@ struct ArrayExporter { // Store owning pointer to ArrayData export_.data_ = data; + export_.sync_event_ = RawSyncEvent(); return Status::OK(); } @@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, return Status::OK(); } +////////////////////////////////////////////////////////////////////////// +// C device arrays + +Status ValidateDeviceInfo(const ArrayData& data, + std::optional* device_type, + int64_t* device_id) { + for (const auto& buf : data.buffers) { + if (!buf) { + continue; + } + + if (*device_type == std::nullopt) { + *device_type = buf->device_type(); + *device_id = buf->device()->device_id(); + continue; + } + + if (buf->device_type() != *device_type) { + return Status::Invalid( + "Exporting device array with buffers on more than one device."); + } + + if (buf->device()->device_id() != *device_id) { + return Status::Invalid( + "Exporting device array with buffers on multiple device ids."); + } + } + + for (const auto& child : data.child_data) { + RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id)); + } + + return Status::OK(); +} + +Result, int64_t>> ValidateDeviceInfo( + const ArrayData& data) { + std::optional device_type; + int64_t device_id = -1; + RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id)); + return std::make_pair(device_type, device_id); +} + +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { + if (sync_event.sync_event != nullptr && sync_event.release_func) { + return Status::Invalid( + "Must provide a release event function if providing a non-null event"); + } + + SchemaExportGuard guard(out_schema); + if (out_schema != nullptr) { + RETURN_NOT_OK(ExportType(*array.type(), out_schema)); + } + + ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data())); + if (!device_info.first) { + out->device_type = ARROW_DEVICE_CPU; + } else { + out->device_type = static_cast(*device_info.first); + } + out->device_id = device_info.second; + + ArrayExporter exporter; + RETURN_NOT_OK(exporter.Export(array.data())); + exporter.Finish(&out->array); + + auto* pdata = reinterpret_cast(out->array.private_data); + pdata->sync_event_ = sync_event; + out->sync_event = sync_event.sync_event; + + guard.Detach(); + return Status::OK(); +} + +Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema) { + if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) { + return Status::Invalid( + "Must provide a release event function if providing a non-null event"); + } + + // XXX perhaps bypass ToStructArray for speed? + ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray()); + + SchemaExportGuard guard(out_schema); + if (out_schema != nullptr) { + // Export the schema, not the struct type, so as not to lose top-level metadata + RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema)); + } + + ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data())); + if (!device_info.first) { + out->device_type = ARROW_DEVICE_CPU; + } else { + out->device_type = static_cast(*device_info.first); + } + out->device_id = device_info.second; + + ArrayExporter exporter; + RETURN_NOT_OK(exporter.Export(array->data())); + exporter.Finish(&out->array); + + auto* pdata = reinterpret_cast(out->array.private_data); + pdata->sync_event_ = sync_event; + out->sync_event = sync_event.sync_event; + + guard.Detach(); + return Status::OK(); +} + ////////////////////////////////////////////////////////////////////////// // C schema import @@ -1242,6 +1362,7 @@ namespace { // The ArrowArray is released on destruction. struct ImportedArrayData { struct ArrowArray array_; + void* sync_event_; ImportedArrayData() { ArrowArrayMarkReleased(&array_); // Initially released @@ -1267,6 +1388,11 @@ class ImportedBuffer : public Buffer { std::shared_ptr import) : Buffer(data, size), import_(std::move(import)) {} + ImportedBuffer(const uint8_t* data, int64_t size, std::shared_ptr mm, + DeviceAllocationType device_type, + std::shared_ptr import) + : Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {} + ~ImportedBuffer() override {} protected: @@ -1275,7 +1401,20 @@ class ImportedBuffer : public Buffer { struct ArrayImporter { explicit ArrayImporter(const std::shared_ptr& type) - : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)) {} + : type_(type), + zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), + device_type_(DeviceAllocationType::kCPU) {} + + Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) { + ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id)); + device_type_ = static_cast(src->device_type); + RETURN_NOT_OK(Import(&src->array)); + import_->sync_event_ = src->sync_event; + // reset internal state before next import + memory_mgr_.reset(); + device_type_ = DeviceAllocationType::kCPU; + return Status::OK(); + } Status Import(struct ArrowArray* src) { if (ArrowArrayIsReleased(src)) { @@ -1588,7 +1727,12 @@ struct ArrayImporter { std::shared_ptr* out = &data_->buffers[buffer_id]; auto data = reinterpret_cast(c_struct_->buffers[buffer_id]); if (data != nullptr) { - *out = std::make_shared(data, buffer_size, import_); + if (memory_mgr_) { + *out = std::make_shared(data, buffer_size, memory_mgr_, + device_type_, import_); + } else { + *out = std::make_shared(data, buffer_size, import_); + } } else if (is_null_bitmap) { out->reset(); } else { @@ -1613,6 +1757,9 @@ struct ArrayImporter { // For imported null buffer pointers std::shared_ptr zero_size_buffer_; + + std::shared_ptr memory_mgr_; + DeviceAllocationType device_type_; }; } // namespace @@ -1652,6 +1799,45 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type, + const DeviceMemoryMapper& mapper) { + ArrayImporter importer(type); + RETURN_NOT_OK(importer.Import(array, mapper)); + return importer.MakeArray(); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type, + const DeviceMemoryMapper& mapper) { + auto maybe_type = ImportType(type); + if (!maybe_type.ok()) { + ArrowArrayRelease(&array->array); + return maybe_type.status(); + } + return ImportDeviceArray(array, *maybe_type, mapper); +} + +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, std::shared_ptr schema, + const DeviceMemoryMapper& mapper) { + auto type = struct_(schema->fields()); + ArrayImporter importer(type); + RETURN_NOT_OK(importer.Import(array, mapper)); + return importer.MakeRecordBatch(std::move(schema)); +} + +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, struct ArrowSchema* schema, + const DeviceMemoryMapper& mapper) { + auto maybe_schema = ImportSchema(schema); + if (!maybe_schema.ok()) { + ArrowArrayRelease(&array->array); + return maybe_schema.status(); + } + return ImportDeviceRecordBatch(array, *maybe_schema, mapper); +} + ////////////////////////////////////////////////////////////////////////// // C stream export diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 3b1a013d20dbf..92707a59729fc 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -166,6 +167,135 @@ Result> ImportRecordBatch(struct ArrowArray* array, /// @} +/// \defgroup c-data-device-interface Functions for working with the C data device +/// interface. +/// +/// @{ + +/// \brief EXPERIMENTAL: Type for freeing a sync event +/// +/// If synchronization is necessary for accessing the data on a device, +/// a pointer to an event needs to be passed when exporting the device +/// array. It's the responsibility of the release function for the array +/// to release the event. Both can be null if no sync'ing is necessary. +struct RawSyncEvent { + void* sync_event = NULL; + std::function release_func; +}; + +/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray. +/// +/// The resulting ArrowDeviceArray struct keeps the array data and buffers alive +/// until its release callback is called by the consumer. All buffers in +/// the provided array MUST have the same device_type, otherwise an error +/// will be returned. +/// +/// If a non-null sync_event is provided, then the sync_release func must also be +/// non-null. If the sync_event is null, then the sync_release parameter is not called. +/// +/// \param[in] array Array object to export +/// \param[in] sync_event A struct containing what is needed for syncing if necessary +/// \param[out] out C struct to export the array to +/// \param[out] out_schema optional C struct to export the array type to +ARROW_EXPORT +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema = NULLPTR); + +/// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray. +/// +/// The record batch is exported as if it were a struct array. +/// The resulting ArrowDeviceArray struct keeps the record batch data and buffers alive +/// until its release callback is called by the consumer. +/// +/// All buffers of all columns in the record batch must have the same device_type +/// otherwise an error will be returned. If columns are on different devices, +/// they should be exported using different ArrowDeviceArray instances. +/// +/// If a non-null sync_event is provided, then the sync_release func must also be +/// non-null. If the sync_event is null, then the sync_release parameter is ignored. +/// +/// \param[in] batch Record batch to export +/// \param[in] sync_event A struct containing what is needed for syncing if necessary +/// \param[out] out C struct where to export the record batch +/// \param[out] out_schema optional C struct where to export the record batch schema +ARROW_EXPORT +Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema = NULLPTR); + +using DeviceMemoryMapper = + std::function>(ArrowDeviceType, int64_t)>; + +/// \brief EXPERIMENTAL: Import C++ device array from the C data interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting array. The +/// buffers of the Array are located on the device indicated by the device_type. +/// +/// \param[in,out] array C data interface struct holding the array data +/// \param[in] type type of the imported array +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported array object +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type, + const DeviceMemoryMapper& mapper); + +/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting array. +/// The ArrowSchema struct is released, even if this function fails. The +/// buffers of the Array are located on the device indicated by the device_type. +/// +/// \param[in,out] array C data interface struct holding the array data +/// \param[in,out] type C data interface struct holding the array type +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported array object +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type, + const DeviceMemoryMapper& mapper); + +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data +/// interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting record batch. +/// The buffers of all columns of the record batch are located on the device +/// indicated by the device type. +/// +/// \param[in,out] array C data interface struct holding the record batch data +/// \param[in] schema schema of the imported record batch +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported record batch object +ARROW_EXPORT +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, std::shared_ptr schema, + const DeviceMemoryMapper& mapper); + +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema +/// from the C data interface. +/// +/// The type represented by the ArrowSchema struct must be a struct type array. +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting record batch. +/// The ArrowSchema struct is released, even if this function fails. The buffers +/// of all columns of the record batch are located on the device indicated by the +/// device type. +/// +/// \param[in,out] array C data interface struct holding the record batch data +/// \param[in,out] schema C data interface struct holding the record batch schema +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported record batch object +ARROW_EXPORT +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, struct ArrowSchema* schema, + const DeviceMemoryMapper& mapper); + +/// @} + /// \defgroup c-stream-interface Functions for working with the C data interface. /// /// @{ diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 5fe7b653c8970..5c7de8e4a0783 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -565,6 +565,15 @@ struct ArrayExportChecker { ASSERT_EQ(c_export->children, nullptr); } } + + void operator()(struct ArrowDeviceArray* c_export, const ArrayData& expected_data, + const ArrowDeviceType device_type, const int64_t device_id, + const void* sync_event) { + ASSERT_EQ(c_export->device_type, device_type); + ASSERT_EQ(c_export->device_id, device_id); + ASSERT_EQ(c_export->sync_event, sync_event); + this->operator()(&c_export->array, expected_data); + } }; struct RecordBatchExportChecker { @@ -592,6 +601,15 @@ struct RecordBatchExportChecker { ASSERT_EQ(c_export->children, nullptr); } } + + void operator()(struct ArrowDeviceArray* c_export, const RecordBatch& expected_data, + const ArrowDeviceType device_type, const int64_t device_id, + const void* sync_event) { + ASSERT_EQ(c_export->device_type, device_type); + ASSERT_EQ(c_export->device_id, device_id); + ASSERT_EQ(c_export->sync_event, sync_event); + this->operator()(&c_export->array, expected_data); + } }; class TestArrayExport : public ::testing::Test { @@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) { } } +//////////////////////////////////////////////////////////////////////////// +// Device Array Export Tests + +static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; +static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV; + +class MyBuffer final : public MutableBuffer { + public: + using MutableBuffer::MutableBuffer; + + ~MyBuffer() { default_memory_pool()->Free(const_cast(data_), size_); } +}; + +class MyMemoryManager : public CPUMemoryManager { + public: + explicit MyMemoryManager(const std::shared_ptr& device) + : CPUMemoryManager(device, default_memory_pool()) {} + + Result> AllocateBuffer(int64_t size) override { + uint8_t* data; + RETURN_NOT_OK(pool_->Allocate(size, &data)); + return std::make_unique(data, size, shared_from_this()); + } + + protected: + Result> CopyBufferFrom( + const std::shared_ptr& buf, + const std::shared_ptr& from) override { + return CopyNonOwnedFrom(*buf, from); + } + Result> CopyNonOwnedFrom( + const Buffer& buf, const std::shared_ptr& from) override { + if (!from->is_cpu()) { + return nullptr; + } + + ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size())); + if (buf.size() > 0) { + memcpy(dest->mutable_data(), buf.data(), static_cast(buf.size())); + } + return std::move(dest); + } +}; + +class MyDevice : public Device { + public: + explicit MyDevice(int value) : Device(true), value_(value) {} + const char* type_name() const override { return kMyDeviceTypeName; } + std::string ToString() const override { return kMyDeviceTypeName; } + bool Equals(const Device& other) const override { + if (other.type_name() != kMyDeviceTypeName || other.device_type() != device_type()) { + return false; + } + return checked_cast(other).value_ == value_; + } + DeviceAllocationType device_type() const override { + return static_cast(kMyDeviceType); + } + int64_t device_id() const override { return value_; } + std::shared_ptr default_memory_manager() override { + return std::make_shared(shared_from_this()); + } + + protected: + int value_; +}; + +class TestDeviceArrayExport : public ::testing::Test { + public: + void SetUp() override { pool_ = default_memory_pool(); } + + static Result> ToDeviceData( + const std::shared_ptr& mm, const ArrayData& data) { + arrow::BufferVector buffers; + for (const auto& buf : data.buffers) { + if (buf) { + ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm)); + buffers.push_back(dest); + } else { + buffers.push_back(nullptr); + } + } + + arrow::ArrayDataVector children; + for (const auto& child : data.child_data) { + ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child)); + children.push_back(dest); + } + + return ArrayData::Make(data.type, data.length, buffers, children, data.null_count, + data.offset); + } + + static Result> ToDevice(const std::shared_ptr& mm, + const ArrayData& data) { + ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data)); + return MakeArray(result); + } + + template + static std::function>()> ToDeviceFactory( + const std::shared_ptr& mm, ArrayFactory&& factory) { + return [&]() { return ToDevice(mm, *factory()->data()); }; + } + + static std::function>()> JSONArrayFactory( + const std::shared_ptr& mm, std::shared_ptr type, + const char* json) { + return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; + } + + template + void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { + auto orig_bytes = pool_->bytes_allocated(); + + std::shared_ptr arr; + ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); + ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(), + ", array data = ", arr->ToString()); + const ArrayData& data = *arr->data(); // non-owning reference + struct ArrowDeviceArray c_export; + ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export)); + + ArrayExportGuard guard(&c_export.array); + auto new_bytes = pool_->bytes_allocated(); + ASSERT_GT(new_bytes, orig_bytes); + + // Release the shared_ptr, underlying data should be held alive + arr.reset(); + ASSERT_EQ(pool_->bytes_allocated(), new_bytes); + check_func(&c_export, data, kMyDeviceType, 1, nullptr); + + // Release the ArrowArray, underlying data should be destroyed + guard.Release(); + ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); + } + + template + void TestNested(ArrayFactory&& factory) { + ArrayExportChecker checker; + TestWithArrayFactory(std::forward(factory), checker); + } + + void TestNested(const std::shared_ptr& mm, + const std::shared_ptr& type, const char* json) { + TestNested(JSONArrayFactory(mm, type, json)); + } + + template + void TestPrimitive(ArrayFactory&& factory) { + TestNested(std::forward(factory)); + } + + void TestPrimitive(const std::shared_ptr& mm, + const std::shared_ptr& type, const char* json) { + TestNested(mm, type, json); + } + + protected: + MemoryPool* pool_; +}; + +TEST_F(TestDeviceArrayExport, Primitive) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestPrimitive(mm, int8(), "[1, 2, null, -3]"); + TestPrimitive(mm, int16(), "[1, 2, -3]"); + TestPrimitive(mm, int32(), "[1, 2, null, -3]"); + TestPrimitive(mm, int64(), "[1, 2, -3]"); + TestPrimitive(mm, uint8(), "[1, 2, 3]"); + TestPrimitive(mm, uint16(), "[1, 2, null, 3]"); + TestPrimitive(mm, uint32(), "[1, 2, 3]"); + TestPrimitive(mm, uint64(), "[1, 2, null, 3]"); + + TestPrimitive(mm, boolean(), "[true, false, null]"); + + TestPrimitive(mm, float32(), "[1.5, null]"); + TestPrimitive(mm, float64(), "[1.5, null]"); + + TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])"); + TestPrimitive(mm, binary(), R"(["foo", "bar", null])"); + TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])"); + TestPrimitive(mm, utf8(), R"(["foo", "bar", null])"); + TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])"); + + TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])"); + TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])"); + + TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])"); +} + +TEST_F(TestDeviceArrayExport, PrimitiveSliced) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + auto factory = [=]() { + return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data())) + ->Slice(1, 2); + }; + TestPrimitive(factory); +} + +TEST_F(TestDeviceArrayExport, Temporal) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* json = "[1, 2, null, 42]"; + TestPrimitive(mm, date32(), json); + TestPrimitive(mm, date64(), json); + TestPrimitive(mm, time32(TimeUnit::SECOND), json); + TestPrimitive(mm, time32(TimeUnit::MILLI), json); + TestPrimitive(mm, time64(TimeUnit::MICRO), json); + TestPrimitive(mm, time64(TimeUnit::NANO), json); + TestPrimitive(mm, duration(TimeUnit::SECOND), json); + TestPrimitive(mm, duration(TimeUnit::MILLI), json); + TestPrimitive(mm, duration(TimeUnit::MICRO), json); + TestPrimitive(mm, duration(TimeUnit::NANO), json); + TestPrimitive(mm, month_interval(), json); + + TestPrimitive(mm, day_time_interval(), "[[7, 600], null]"); + + json = R"(["1970-01-01","2000-02-29","1900-02-28"])"; + TestPrimitive(mm, timestamp(TimeUnit::SECOND), json); + TestPrimitive(mm, timestamp(TimeUnit::SECOND, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::MILLI), json); + TestPrimitive(mm, timestamp(TimeUnit::MILLI, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::MICRO), json); + TestPrimitive(mm, timestamp(TimeUnit::MICRO, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::NANO), json); + TestPrimitive(mm, timestamp(TimeUnit::NANO, "Europe/Paris"), json); +} + +TEST_F(TestDeviceArrayExport, List) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestNested(mm, list(int8()), "[[1, 2], [3, null], null]"); + TestNested(mm, large_list(uint16()), "[[1, 2], [3, null], null]"); + TestNested(mm, fixed_size_list(int64(), 2), "[[1, 2], [3, null], null]"); + + TestNested(mm, list(large_list(int32())), "[[[1, 2], [3], null], null]"); +} + +TEST_F(TestDeviceArrayExport, ListSliced) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + { + auto factory = [=]() { + return (*ToDevice( + mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 6], null]") + ->data())) + ->Slice(1, 2); + }; + TestNested(factory); + } + { + auto factory = [=]() { + auto values = + (*ToDevice(mm, + *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 8]")->data())) + ->Slice(1, 6); + auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 6]")->data())) + ->Slice(2, 4); + return ListArray::FromArrays(*offsets, *values); + }; + TestNested(factory); + } +} + +TEST_F(TestDeviceArrayExport, Struct) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* data = R"([[1, "foo"], [2, null]])"; + auto type = struct_({field("a", int8()), field("b", utf8())}); + TestNested(mm, type, data); +} + +TEST_F(TestDeviceArrayExport, Map) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* json = R"([[[1, "foo"], [2, null]], [[3, "bar"]]])"; + TestNested(mm, map(int8(), utf8()), json); + TestNested(mm, map(int8(), utf8(), /*keys_sorted=*/true), json); +} + +TEST_F(TestDeviceArrayExport, Union) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* data = "[null, [42, 1], [43, true], [42, null], [42, 2]]"; + // Dense + auto field_a = field("a", int8()); + auto field_b = field("b", boolean(), /*nullable=*/false); + auto type = dense_union({field_a, field_b}, {42, 43}); + TestNested(mm, type, data); + // Sparse + field_a = field("a", int8(), /*nullable=*/false); + field_b = field("b", boolean()); + type = sparse_union({field_a, field_b}, {42, 43}); + TestNested(mm, type, data); +} + +TEST_F(TestDeviceArrayExport, Extension) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestPrimitive(ToDeviceFactory(mm, ExampleUuid)); + TestPrimitive(ToDeviceFactory(mm, ExampleSmallint)); + TestPrimitive(ToDeviceFactory(mm, ExampleComplex128)); +} + +TEST_F(TestDeviceArrayExport, ExportArrayAndType) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + struct ArrowSchema c_schema {}; + struct ArrowDeviceArray c_array {}; + SchemaExportGuard schema_guard(&c_schema); + ArrayExportGuard array_guard(&c_array.array); + + auto array = ToDevice(mm, *ArrayFromJSON(int8(), "[1, 2, 3]")->data()).ValueOrDie(); + ASSERT_OK(ExportDeviceArray(*array, {nullptr, nullptr}, &c_array, &c_schema)); + const ArrayData& data = *array->data(); + array.reset(); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + ASSERT_FALSE(ArrowArrayIsReleased(&c_array.array)); + ASSERT_EQ(c_schema.format, std::string("c")); + ASSERT_EQ(c_schema.n_children, 0); + ArrayExportChecker checker{}; + checker(&c_array, data, kMyDeviceType, 1, nullptr); +} + +TEST_F(TestDeviceArrayExport, ExportRecordBatch) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + struct ArrowSchema c_schema {}; + struct ArrowDeviceArray c_array {}; + + auto schema = ::arrow::schema( + {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)}); + schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2, kMetadataValues2)); + auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null]")->data()).ValueOrDie(); + auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, false]")->data()) + .ValueOrDie(); + + auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0, arr1}); }; + + { + auto batch = batch_factory(); + + ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); + SchemaExportGuard schema_guard(&c_schema); + ArrayExportGuard array_guard(&c_array.array); + RecordBatchExportChecker checker{}; + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + + // create batch anew, with the same buffer pointers + batch = batch_factory(); + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + } + { + // Check one can export both schema and record batch at once + auto batch = batch_factory(); + + ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); + SchemaExportGuard schema_guard(&c_schema); + ArrayExportGuard array_guard(&c_array.array); + ASSERT_EQ(c_schema.format, std::string("+s")); + ASSERT_EQ(c_schema.n_children, 2); + ASSERT_NE(c_schema.metadata, nullptr); + ASSERT_EQ(kEncodedMetadata2, + std::string(c_schema.metadata, kEncodedMetadata2.size())); + RecordBatchExportChecker checker{}; + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + + // Create batch anew, with the same buffer pointers + batch = batch_factory(); + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + } +} + //////////////////////////////////////////////////////////////////////////// // Schema import tests diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 67c62a5181f28..9cc68fe8c82ce 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -29,6 +29,24 @@ namespace arrow { +/// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device types +enum class DeviceAllocationType : char { + kCPU = 1, + kCUDA = 2, + kCUDA_HOST = 3, + kOPENCL = 4, + kVULKAN = 7, + kMETAL = 8, + kVPI = 9, + kROCM = 10, + kROCM_HOST = 11, + kEXT_DEV = 12, + kCUDA_MANAGED = 13, + kONEAPI = 14, + kWEBGPU = 15, + kHEXAGON = 16, +}; + class MemoryManager; /// \brief EXPERIMENTAL: Abstract interface for hardware devices @@ -58,6 +76,12 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// \brief Whether this instance points to the same device as another one. virtual bool Equals(const Device&) const = 0; + /// \brief A device ID to identify this device if there are multiple of this type. + /// + /// If there is no "device_id" equivalent (such as for the main CPU device on + /// non-numa systems) returns -1. + virtual int64_t device_id() const { return -1; } + /// \brief Whether this device is the main CPU device. /// /// This shorthand method is very useful when deciding whether a memory address @@ -71,6 +95,9 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// MemoryManager instances with non-default parameters. virtual std::shared_ptr default_memory_manager() = 0; + /// \brief Return the DeviceAllocationType of this device + virtual DeviceAllocationType device_type() const = 0; + protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Device); explicit Device(bool is_cpu = false) : is_cpu_(is_cpu) {} @@ -172,6 +199,7 @@ class ARROW_EXPORT CPUDevice : public Device { const char* type_name() const override; std::string ToString() const override; bool Equals(const Device&) const override; + DeviceAllocationType device_type() const override { return DeviceAllocationType::kCPU; } std::shared_ptr default_memory_manager() override; diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index f754c07d13c89..869ea6453ccda 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -384,7 +384,8 @@ Result> CudaMemoryManager::ViewBufferTo( if (to->is_cpu()) { // Device-on-CPU view ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address())); - return std::make_shared(address, buf->size(), to, buf); + return std::make_shared(address, buf->size(), to, buf, + DeviceAllocationType::kCUDA_HOST); } return nullptr; } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 0115ed19a103d..a1b95c7b4181d 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -92,6 +92,10 @@ class ARROW_EXPORT CudaDevice : public Device { std::string ToString() const override; bool Equals(const Device&) const override; std::shared_ptr default_memory_manager() override; + DeviceAllocationType device_type() const override { + return DeviceAllocationType::kCUDA; + } + int64_t device_id() const override { return device_number(); } /// \brief Return a CudaDevice instance for a particular device /// \param[in] device_number the CUDA device number diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 297e4dcf71e44..860c6311d7b2f 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -198,6 +198,11 @@ Result> CudaBuffer::ExportForIpc() { return handle; } +CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size) + : MutableBuffer(data, size) { + device_type_ = DeviceAllocationType::kCUDA_HOST; +} + CudaHostBuffer::~CudaHostBuffer() { auto maybe_manager = CudaDeviceManager::Instance(); ARROW_CHECK_OK(maybe_manager.status()); @@ -480,5 +485,21 @@ Result GetHostAddress(uintptr_t device_ptr) { return static_cast(ptr); } +Result> DefaultMemoryMapper(ArrowDeviceType device_type, + int64_t device_id) { + switch (device_type) { + case ARROW_DEVICE_CPU: + return default_cpu_memory_manager(); + case ARROW_DEVICE_CUDA: + case ARROW_DEVICE_CUDA_HOST: + case ARROW_DEVICE_CUDA_MANAGED: { + ARROW_ASSIGN_OR_RAISE(auto device, arrow::cuda::CudaDevice::Make(device_id)); + return device->default_memory_manager(); + } + default: + return Status::NotImplemented("memory manager not implemented for device"); + } +} + } // namespace cuda } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 18c23a507805a..d323bef03494e 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -21,6 +21,7 @@ #include #include "arrow/buffer.h" +#include "arrow/c/abi.h" #include "arrow/io/concurrency.h" #include "arrow/type_fwd.h" @@ -110,7 +111,8 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// \brief Device-accessible CPU memory created using cudaHostAlloc class ARROW_EXPORT CudaHostBuffer : public MutableBuffer { public: - using MutableBuffer::MutableBuffer; + CudaHostBuffer(uint8_t* data, const int64_t size); + ~CudaHostBuffer(); /// \brief Return a device address the GPU can read this memory from. @@ -258,5 +260,9 @@ Result GetDeviceAddress(const uint8_t* cpu_data, ARROW_EXPORT Result GetHostAddress(uintptr_t device_ptr); +ARROW_EXPORT +Result> DefaultMemoryMapper(ArrowDeviceType device_type, + int64_t device_id); + } // namespace cuda } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index aac45d13831e5..6d392213e231f 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -364,6 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); + ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST); ASSERT_OK_AND_ASSIGN(auto device_address, host_buffer->GetDeviceAddress(context_)); ASSERT_NE(device_address, 0); @@ -376,6 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); + ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST); // Try to view the host buffer on the device. This should correspond to // GetDeviceAddress() in the previous test. @@ -385,6 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_NE(device_buffer->address(), 0); ASSERT_EQ(device_buffer->size(), host_buffer->size()); ASSERT_EQ(device_buffer->parent(), host_buffer); + ASSERT_EQ(device_buffer->device_type(), DeviceAllocationType::kCUDA); // View back the device buffer on the CPU. This should roundtrip. ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_)); @@ -393,6 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_EQ(buffer->address(), host_buffer->address()); ASSERT_EQ(buffer->size(), host_buffer->size()); ASSERT_EQ(buffer->parent(), device_buffer); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCUDA_HOST); } // ------------------------------------------------------------------------