From ab9824935c8cc6fe5dbc77dff4f2055720440951 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 5 Jul 2023 13:19:17 -0400 Subject: [PATCH 01/16] GH-36488: [C++] Import/Export ArrowDeviceArray --- cpp/src/arrow/buffer.h | 15 ++- cpp/src/arrow/buffer_test.cc | 14 +++ cpp/src/arrow/c/bridge.cc | 179 +++++++++++++++++++++++++++++- cpp/src/arrow/c/bridge.h | 129 +++++++++++++++++++++ cpp/src/arrow/device.h | 31 +++++- cpp/src/arrow/gpu/cuda_context.cc | 4 +- cpp/src/arrow/gpu/cuda_context.h | 2 + cpp/src/arrow/gpu/cuda_memory.cc | 5 + cpp/src/arrow/gpu/cuda_memory.h | 3 +- cpp/src/arrow/gpu/cuda_test.cc | 4 + 10 files changed, 377 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 65f1abda16106..4fc9d94c18229 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -57,18 +57,25 @@ class ARROW_EXPORT Buffer { /// /// \note The passed memory must be kept alive through some other means Buffer(const uint8_t* data, int64_t size) - : is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size) { + : is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size), device_type_(DeviceType::CPU) { SetMemoryManager(default_cpu_memory_manager()); } Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, - std::shared_ptr parent = NULLPTR) + std::shared_ptr parent = NULLPTR, DeviceType device_type = DeviceType::UNKNOWN) : is_mutable_(false), data_(data), size_(size), capacity_(size), parent_(std::move(parent)) { + // will set device_type from the memory manager SetMemoryManager(std::move(mm)); + // if a device type is specified, use that instead. for example: + // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA, + // but you can specify CUDA_HOST as the device type to override it. + if (device_type != DeviceType::UNKNOWN) { + device_type_ = device_type; + } } Buffer(uintptr_t address, int64_t size, std::shared_ptr mm, @@ -240,6 +247,8 @@ class ARROW_EXPORT Buffer { const std::shared_ptr& memory_manager() const { return memory_manager_; } + DeviceType device_type() const { return device_type_; } + std::shared_ptr parent() const { return parent_; } /// \brief Get a RandomAccessFile for reading a buffer @@ -294,6 +303,7 @@ class ARROW_EXPORT Buffer { const uint8_t* data_; int64_t size_; int64_t capacity_; + DeviceType device_type_; // null by default, but may be set std::shared_ptr parent_; @@ -309,6 +319,7 @@ class ARROW_EXPORT Buffer { void SetMemoryManager(std::shared_ptr mm) { memory_manager_ = std::move(mm); is_cpu_ = memory_manager_->is_cpu(); + device_type_ = memory_manager_->device()->device_type(); } private: diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index ce8bab846d586..ffae615ae245a 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -41,6 +41,7 @@ using internal::checked_cast; using internal::checked_pointer_cast; static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; +static const DeviceType kMyDeviceType = DeviceType::EXT_DEV; static const int kMyDeviceAllowCopy = 1; static const int kMyDeviceAllowView = 2; @@ -70,6 +71,8 @@ class MyDevice : public Device { return checked_cast(other).value_ == value_; } + DeviceType device_type() const override { return kMyDeviceType; } + std::shared_ptr default_memory_manager() override; int value() const { return value_; } @@ -256,6 +259,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -263,6 +267,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -290,6 +297,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -297,6 +305,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), my_copy_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -330,6 +341,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -338,6 +350,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), my_view_device_); ASSERT_FALSE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); + ASSERT_EQ(buffer->device_type(), kMyDeviceType); #ifdef NDEBUG ASSERT_EQ(buffer->data(), nullptr); #endif @@ -348,6 +361,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), my_copy_src_->address()); + ASSERT_EQ(buffer->device_type(), DeviceType::CPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 85a5156d11db2..ca3a0de654c6f 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -522,6 +522,9 @@ struct ExportedArrayPrivateData : PoolAllocationMixin std::shared_ptr data_; + ReleaseEventFunc sync_release_; + void* sync_event_; + ExportedArrayPrivateData() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData); ARROW_DISALLOW_COPY_AND_ASSIGN(ExportedArrayPrivateData); @@ -544,7 +547,11 @@ void ReleaseExportedArray(struct ArrowArray* array) { << "Dictionary release callback should have marked it released"; } DCHECK_NE(array->private_data, nullptr); - delete reinterpret_cast(array->private_data); + auto* pdata = reinterpret_cast(array->private_data); + if (pdata->sync_event_ != nullptr && pdata->sync_release_ != nullptr) { + pdata->sync_release_(pdata->sync_event_); + } + delete pdata; ArrowArrayMarkReleased(array); } @@ -584,6 +591,8 @@ struct ArrayExporter { // Store owning pointer to ArrayData export_.data_ = data; + export_.sync_event_ = nullptr; + export_.sync_release_ = nullptr; return Status::OK(); } @@ -663,6 +672,107 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, return Status::OK(); } +////////////////////////////////////////////////////////////////////////// +// C device arrays + +Result> validate_device_info(const ArrayData& data) { + DeviceType device_type = DeviceType::UNKNOWN; + int64_t device_id = -1; + + for (const auto& buf : data.buffers) { + if (!buf) { + // some buffers might be null + // for example, the null bitmap is optional if there's no nulls + continue; + } + + if (device_type == DeviceType::UNKNOWN) { + device_type = buf->device_type(); + device_id = buf->device()->device_id(); + continue; + } + + if (buf->device_type() != device_type) { + return Status::Invalid("exporting device array with buffers on more than one device."); + } + + if (buf->device()->device_id() != device_id) { + return Status::Invalid("exporting device array with buffers on multiple device ids."); + } + } + + // recursively check the children + auto info = std::make_pair(device_type, device_id); + for (const auto& child : data.child_data) { + ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*child)); + if (info != device_info) { + return Status::Invalid("exporting device array with buffers on more than one device."); + } + } + + return info; +} + +Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { + if (sync_event != nullptr && sync_release == nullptr) { + return Status::Invalid("must provide a release event function if providing a non-null event"); + } + + SchemaExportGuard guard(out_schema); + if (out_schema != nullptr) { + RETURN_NOT_OK(ExportType(*array.type(), out_schema)); + } + + ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*array.data())); + out->device_type = static_cast(device_info.first); + out->device_id = device_info.second; + + ArrayExporter exporter; + RETURN_NOT_OK(exporter.Export(array.data())); + exporter.Finish(&out->array); + + auto* pdata = reinterpret_cast(out->array.private_data); + pdata->sync_event_ = sync_event; + pdata->sync_release_ = sync_release; + out->sync_event = sync_event; + + guard.Detach(); + return Status::OK(); +} + +Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { + if (sync_event != nullptr && sync_release == nullptr) { + return Status::Invalid("must provide a release event function if providing a non-null event"); + } + + // XXX perhaps bypass ToStructArray for speed? + ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray()); + + SchemaExportGuard guard(out_schema); + if (out_schema != nullptr) { + // Export the schema, not the struct type, so as not to lose top-level metadata + RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema)); + } + + ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*array->data())); + out->device_type = static_cast(device_info.first); + out->device_id = device_info.second; + + ArrayExporter exporter; + RETURN_NOT_OK(exporter.Export(array->data())); + exporter.Finish(&out->array); + + auto* pdata = reinterpret_cast(out->array.private_data); + pdata->sync_event_ = sync_event; + pdata->sync_release_ = sync_release; + out->sync_event = sync_event; + + guard.Detach(); + return Status::OK(); +} + ////////////////////////////////////////////////////////////////////////// // C schema import @@ -1242,6 +1352,7 @@ namespace { // The ArrowArray is released on destruction. struct ImportedArrayData { struct ArrowArray array_; + void* sync_event_; ImportedArrayData() { ArrowArrayMarkReleased(&array_); // Initially released @@ -1267,6 +1378,12 @@ class ImportedBuffer : public Buffer { std::shared_ptr import) : Buffer(data, size), import_(std::move(import)) {} + ImportedBuffer(const uint8_t* data, int64_t size, + std::shared_ptr mm, + DeviceType device_type, + std::shared_ptr import) + : Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {} + ~ImportedBuffer() override {} protected: @@ -1275,7 +1392,16 @@ class ImportedBuffer : public Buffer { struct ArrayImporter { explicit ArrayImporter(const std::shared_ptr& type) - : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)) {} + : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), device_type_(DeviceType::CPU) {} + + Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMgr& mapper) { + ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper.get_manager(src->device_type, src->device_id)); + device_type_ = static_cast(src->device_type); + RETURN_NOT_OK(Import(&src->array)); + import_->sync_event_ = src->sync_event; + memory_mgr_.reset(); + return Status::OK(); + } Status Import(struct ArrowArray* src) { if (ArrowArrayIsReleased(src)) { @@ -1588,7 +1714,11 @@ struct ArrayImporter { std::shared_ptr* out = &data_->buffers[buffer_id]; auto data = reinterpret_cast(c_struct_->buffers[buffer_id]); if (data != nullptr) { - *out = std::make_shared(data, buffer_size, import_); + if (memory_mgr_) { + *out = std::make_shared(data, buffer_size, memory_mgr_, device_type_, import_); + } else { + *out = std::make_shared(data, buffer_size, import_); + } } else if (is_null_bitmap) { out->reset(); } else { @@ -1613,6 +1743,9 @@ struct ArrayImporter { // For imported null buffer pointers std::shared_ptr zero_size_buffer_; + + std::shared_ptr memory_mgr_; + DeviceType device_type_; }; } // namespace @@ -1652,6 +1785,46 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type, + const DeviceMemoryMgr& mapper) { + ArrayImporter importer(type); + RETURN_NOT_OK(importer.Import(array, mapper)); + return importer.MakeArray(); +} + +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type, + const DeviceMemoryMgr& mapper) { + auto maybe_type = ImportType(type); + if (!maybe_type.ok()) { + ArrowArrayRelease(&array->array); + return maybe_type.status(); + } + return ImportDeviceArray(array, *maybe_type, mapper); +} + +Result> ImportDeviceRecordBatch(struct ArrowDeviceArray* array, + std::shared_ptr schema, + const DeviceMemoryMgr& mapper) { + auto type = struct_(schema->fields()); + ArrayImporter importer(type); + RETURN_NOT_OK(importer.Import(array, mapper)); + return importer.MakeRecordBatch(std::move(schema)); +} + +Result> ImportDeviceRecordBatch(struct ArrowDeviceArray* array, + struct ArrowSchema* schema, + const DeviceMemoryMgr& mapper) { + auto maybe_schema = ImportSchema(schema); + if (!maybe_schema.ok()) { + ArrowArrayRelease(&array->array); + return maybe_schema.status(); + } + return ImportDeviceRecordBatch(array, *maybe_schema, mapper); +} + + ////////////////////////////////////////////////////////////////////////// // C stream export diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 3b1a013d20dbf..388b093e0f06e 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -166,6 +166,135 @@ Result> ImportRecordBatch(struct ArrowArray* array, /// @} +/// \defgroup c-data-device-interface Functions for working with the C data device interface. +/// +/// @{ + +/// \brief EXPERIMENTAL: Type for freeing a sync event +/// +/// If synchronization is necessary for accessing the data on a device, +/// a pointer to an event needs to be passed when exporting the device +/// array. It's the responsibility of the release function for the array +/// to release the event. +using ReleaseEventFunc = void (*)(void*); + +/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray. +/// +/// The resulting ArrowDeviceArray struct keeps the array data and buffers alive +/// until its release callback is called by the consumer. All buffers in +/// the provided array MUST have the same device_type, otherwise an error +/// will be returned. +/// +/// If a non-null sync_event is provided, then the sync_release func must also be +/// non-null. If the sync_event is null, then the sync_release parameter is ignored. +/// +/// \param[in] array Array object to export +/// \param[in] sync_event A pointer to an event-like object if necessary for synchronization, otherwise null. +/// \param[in] sync_release Function pointer to release the sync event +/// \param[out] out C struct to export the array to +/// \param[out] out_schema optional C struct to export the array type to +ARROW_EXPORT +Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); + +/// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray. +/// +/// The record batch is exported as if it were a struct array. +/// The resulting ArrowDeviceArray struct keeps the record batch data and buffers alive +/// until its release callback is called by the consumer. +/// +/// All buffers of all columns in the record batch must have the same device_type +/// otherwise an error will be returned. If columns are on different devices, +/// they should be exported using different ArrowDeviceArray instances. +/// +/// If a non-null sync_event is provided, then the sync_release func must also be +/// non-null. If the sync_event is null, then the sync_release parameter is ignored. +/// +/// \param[in] batch Record batch to export +/// \param[in] sync_event A pointer to an event-like object if necessary for synchronization, otherwise null. +/// \param[in] sync_release Function pointer to release the sync event +/// \param[out] out C struct where to export the record batch +/// \param[out] out_schema optional C struct where to export the record batch schema +ARROW_EXPORT +Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); + + +class ARROW_EXPORT DeviceMemoryMgr { + public: + virtual ~DeviceMemoryMgr() = default; + + virtual Result> get_manager(ArrowDeviceType device_type, int64_t device_id) const = 0; +}; + +/// \brief EXPERIMENTAL: Import C++ device array from the C data interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting array. The +/// buffers of the Array are located on the device indicated by the device_type. +/// +/// \param[in,out] array C data interface struct holding the array data +/// \param[in] type type of the imported array +/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager +/// \return Imported array object +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + std::shared_ptr type, + const DeviceMemoryMgr& mapper); + +/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting array. +/// The ArrowSchema struct is released, even if this function fails. The +/// buffers of the Array are located on the device indicated by the device_type. +/// +/// \param[in,out] array C data interface struct holding the array data +/// \param[in,out] type C data interface struct holding the array type +/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager +/// \return Imported array object +ARROW_EXPORT +Result> ImportDeviceArray(struct ArrowDeviceArray* array, + struct ArrowSchema* type, + const DeviceMemoryMgr& mapper); + +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data interface. +/// +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting record batch. +/// The buffers of all columns of the record batch are located on the device +/// indicated by the device type. +/// +/// \param[in,out] array C data interface struct holding the record batch data +/// \param[in] schema schema of the imported record batch +/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager +/// \return Imported record batch object +ARROW_EXPORT +Result> ImportDeviceRecordBatch(struct ArrowArray* array, + std::shared_ptr schema, + const DeviceMemoryMgr& mapper); + +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema from the C data interface. +/// +/// The type represented by the ArrowSchema struct must be a struct type array. +/// The ArrowArray struct has its contents moved (as per the C data interface +/// specification) to a private object held alive by the resulting record batch. +/// The ArrowSchema struct is released, even if this function fails. The buffers +/// of all columns of the record batch are located on the device indicated by the +/// device type. +/// +/// \param[in,out] array C data interface struct holding the record batch data +/// \param[in,out] schema C data interface struct holding the record batch schema +/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager +/// \return Imported record batch object +ARROW_EXPORT +Result> ImportDeviceRecordBatch(struct ArrowArray* array, + struct ArrowSchema* schema, + const DeviceMemoryMgr& mapper); + + +/// @} + /// \defgroup c-stream-interface Functions for working with the C data interface. /// /// @{ diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 67c62a5181f28..f12ed0b89397e 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -29,6 +29,25 @@ namespace arrow { +/// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device types +enum class DeviceType : char { + UNKNOWN = 0, + CPU = 1, + CUDA = 2, + CUDA_HOST = 3, + OPENCL = 4, + VULKAN = 7, + METAL = 8, + VPI = 9, + ROCM = 10, + ROCM_HOST = 11, + EXT_DEV = 12, + CUDA_MANAGED = 13, + ONEAPI = 14, + WEBGPU = 15, + HEXAGON = 16, +}; + class MemoryManager; /// \brief EXPERIMENTAL: Abstract interface for hardware devices @@ -58,6 +77,12 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// \brief Whether this instance points to the same device as another one. virtual bool Equals(const Device&) const = 0; + /// \brief A device ID to identify this device if there are multiple of this type. + /// + /// If there is no "device_id" equivalent (such as for the main CPU device) + /// returns -1. + virtual int64_t device_id() const { return -1; } + /// \brief Whether this device is the main CPU device. /// /// This shorthand method is very useful when deciding whether a memory address @@ -71,6 +96,9 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// MemoryManager instances with non-default parameters. virtual std::shared_ptr default_memory_manager() = 0; + /// \brief Return the DeviceType of this device + virtual DeviceType device_type() const = 0; + protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Device); explicit Device(bool is_cpu = false) : is_cpu_(is_cpu) {} @@ -172,8 +200,9 @@ class ARROW_EXPORT CPUDevice : public Device { const char* type_name() const override; std::string ToString() const override; bool Equals(const Device&) const override; + DeviceType device_type() const override { return DeviceType::CPU; } - std::shared_ptr default_memory_manager() override; + std::shared_ptr default_memory_manager() override; /// \brief Return the global CPUDevice instance static std::shared_ptr Instance(); diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index f754c07d13c89..423cfb04af918 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -384,7 +384,7 @@ Result> CudaMemoryManager::ViewBufferTo( if (to->is_cpu()) { // Device-on-CPU view ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address())); - return std::make_shared(address, buf->size(), to, buf); + return std::make_shared(address, buf->size(), to, buf, DeviceType::CUDA_HOST); } return nullptr; } @@ -521,7 +521,7 @@ Result> CudaDeviceManager::GetSharedContext( Result> CudaDeviceManager::AllocateHost(int device_number, int64_t nbytes) { uint8_t* data = nullptr; - RETURN_NOT_OK(impl_->AllocateHost(device_number, nbytes, &data)); + RETURN_NOT_OK(impl_->AllocateHost(device_number, nbytes, &data)); return std::make_shared(data, nbytes); } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 0115ed19a103d..e00687e7c6818 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -92,6 +92,8 @@ class ARROW_EXPORT CudaDevice : public Device { std::string ToString() const override; bool Equals(const Device&) const override; std::shared_ptr default_memory_manager() override; + DeviceType device_type() const override { return DeviceType::CUDA; } + int64_t device_id() const override { return device_number(); } /// \brief Return a CudaDevice instance for a particular device /// \param[in] device_number the CUDA device number diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 297e4dcf71e44..91020f085d459 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -198,6 +198,11 @@ Result> CudaBuffer::ExportForIpc() { return handle; } +CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size) + : MutableBuffer(data, size) { + device_type_ = DeviceType::CUDA_HOST; +} + CudaHostBuffer::~CudaHostBuffer() { auto maybe_manager = CudaDeviceManager::Instance(); ARROW_CHECK_OK(maybe_manager.status()); diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 18c23a507805a..0b92c020e8b83 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -110,7 +110,8 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// \brief Device-accessible CPU memory created using cudaHostAlloc class ARROW_EXPORT CudaHostBuffer : public MutableBuffer { public: - using MutableBuffer::MutableBuffer; + CudaHostBuffer(uint8_t* data, const int64_t size); + ~CudaHostBuffer(); /// \brief Return a device address the GPU can read this memory from. diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index aac45d13831e5..19d6366c555ce 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -364,6 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); + ASSERT_EQ(host_buffer->device_type(), DeviceType::CUDA_HOST); ASSERT_OK_AND_ASSIGN(auto device_address, host_buffer->GetDeviceAddress(context_)); ASSERT_NE(device_address, 0); @@ -376,6 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); + ASSERT_EQ(host_buffer->device_type(), DeviceType::CUDA_HOST); // Try to view the host buffer on the device. This should correspond to // GetDeviceAddress() in the previous test. @@ -385,6 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_NE(device_buffer->address(), 0); ASSERT_EQ(device_buffer->size(), host_buffer->size()); ASSERT_EQ(device_buffer->parent(), host_buffer); + ASSERT_EQ(device_buffer->device_type(), DeviceType::CUDA); // View back the device buffer on the CPU. This should roundtrip. ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_)); @@ -393,6 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_EQ(buffer->address(), host_buffer->address()); ASSERT_EQ(buffer->size(), host_buffer->size()); ASSERT_EQ(buffer->parent(), device_buffer); + ASSERT_EQ(buffer->device_type(), DeviceType::CUDA_HOST); } // ------------------------------------------------------------------------ From 4d0ac93262053276a39c624790a24256896daead Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 5 Jul 2023 13:44:52 -0400 Subject: [PATCH 02/16] linter --- cpp/src/arrow/c/bridge.cc | 69 +++++++++++++++++++++----------------- cpp/src/arrow/c/bridge.h | 70 +++++++++++++++++++++------------------ 2 files changed, 76 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index ca3a0de654c6f..7618e3320491d 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -680,7 +680,7 @@ Result> validate_device_info(const ArrayData& dat int64_t device_id = -1; for (const auto& buf : data.buffers) { - if (!buf) { + if (!buf) { // some buffers might be null // for example, the null bitmap is optional if there's no nulls continue; @@ -693,11 +693,13 @@ Result> validate_device_info(const ArrayData& dat } if (buf->device_type() != device_type) { - return Status::Invalid("exporting device array with buffers on more than one device."); + return Status::Invalid( + "exporting device array with buffers on more than one device."); } if (buf->device()->device_id() != device_id) { - return Status::Invalid("exporting device array with buffers on multiple device ids."); + return Status::Invalid( + "exporting device array with buffers on multiple device ids."); } } @@ -706,19 +708,22 @@ Result> validate_device_info(const ArrayData& dat for (const auto& child : data.child_data) { ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*child)); if (info != device_info) { - return Status::Invalid("exporting device array with buffers on more than one device."); + return Status::Invalid( + "exporting device array with buffers on more than one device."); } } return info; } -Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, - struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { +Status ExportDeviceArray(const Array& array, void* sync_event, + ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema) { if (sync_event != nullptr && sync_release == nullptr) { - return Status::Invalid("must provide a release event function if providing a non-null event"); + return Status::Invalid( + "must provide a release event function if providing a non-null event"); } - + SchemaExportGuard guard(out_schema); if (out_schema != nullptr) { RETURN_NOT_OK(ExportType(*array.type(), out_schema)); @@ -731,20 +736,23 @@ Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc ArrayExporter exporter; RETURN_NOT_OK(exporter.Export(array.data())); exporter.Finish(&out->array); - + auto* pdata = reinterpret_cast(out->array.private_data); pdata->sync_event_ = sync_event; pdata->sync_release_ = sync_release; - out->sync_event = sync_event; + out->sync_event = sync_event; guard.Detach(); return Status::OK(); } -Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, ReleaseEventFunc sync_release, - struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { +Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, + ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema) { if (sync_event != nullptr && sync_release == nullptr) { - return Status::Invalid("must provide a release event function if providing a non-null event"); + return Status::Invalid( + "must provide a release event function if providing a non-null event"); } // XXX perhaps bypass ToStructArray for speed? @@ -763,12 +771,12 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, Relea ArrayExporter exporter; RETURN_NOT_OK(exporter.Export(array->data())); exporter.Finish(&out->array); - + auto* pdata = reinterpret_cast(out->array.private_data); pdata->sync_event_ = sync_event; pdata->sync_release_ = sync_release; - out->sync_event = sync_event; - + out->sync_event = sync_event; + guard.Detach(); return Status::OK(); } @@ -1378,10 +1386,8 @@ class ImportedBuffer : public Buffer { std::shared_ptr import) : Buffer(data, size), import_(std::move(import)) {} - ImportedBuffer(const uint8_t* data, int64_t size, - std::shared_ptr mm, - DeviceType device_type, - std::shared_ptr import) + ImportedBuffer(const uint8_t* data, int64_t size, std::shared_ptr mm, + DeviceType device_type, std::shared_ptr import) : Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {} ~ImportedBuffer() override {} @@ -1392,10 +1398,13 @@ class ImportedBuffer : public Buffer { struct ArrayImporter { explicit ArrayImporter(const std::shared_ptr& type) - : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), device_type_(DeviceType::CPU) {} + : type_(type), + zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), + device_type_(DeviceType::CPU) {} Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMgr& mapper) { - ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper.get_manager(src->device_type, src->device_id)); + ARROW_ASSIGN_OR_RAISE(memory_mgr_, + mapper.get_manager(src->device_type, src->device_id)); device_type_ = static_cast(src->device_type); RETURN_NOT_OK(Import(&src->array)); import_->sync_event_ = src->sync_event; @@ -1715,7 +1724,8 @@ struct ArrayImporter { auto data = reinterpret_cast(c_struct_->buffers[buffer_id]); if (data != nullptr) { if (memory_mgr_) { - *out = std::make_shared(data, buffer_size, memory_mgr_, device_type_, import_); + *out = std::make_shared(data, buffer_size, memory_mgr_, + device_type_, import_); } else { *out = std::make_shared(data, buffer_size, import_); } @@ -1804,18 +1814,18 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, return ImportDeviceArray(array, *maybe_type, mapper); } -Result> ImportDeviceRecordBatch(struct ArrowDeviceArray* array, - std::shared_ptr schema, - const DeviceMemoryMgr& mapper) { +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, std::shared_ptr schema, + const DeviceMemoryMgr& mapper) { auto type = struct_(schema->fields()); ArrayImporter importer(type); RETURN_NOT_OK(importer.Import(array, mapper)); return importer.MakeRecordBatch(std::move(schema)); } -Result> ImportDeviceRecordBatch(struct ArrowDeviceArray* array, - struct ArrowSchema* schema, - const DeviceMemoryMgr& mapper) { +Result> ImportDeviceRecordBatch( + struct ArrowDeviceArray* array, struct ArrowSchema* schema, + const DeviceMemoryMgr& mapper) { auto maybe_schema = ImportSchema(schema); if (!maybe_schema.ok()) { ArrowArrayRelease(&array->array); @@ -1824,7 +1834,6 @@ Result> ImportDeviceRecordBatch(struct ArrowDeviceA return ImportDeviceRecordBatch(array, *maybe_schema, mapper); } - ////////////////////////////////////////////////////////////////////////// // C stream export diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 388b093e0f06e..4161a148cb17e 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -166,14 +166,15 @@ Result> ImportRecordBatch(struct ArrowArray* array, /// @} -/// \defgroup c-data-device-interface Functions for working with the C data device interface. +/// \defgroup c-data-device-interface Functions for working with the C data device +/// interface. /// /// @{ /// \brief EXPERIMENTAL: Type for freeing a sync event /// /// If synchronization is necessary for accessing the data on a device, -/// a pointer to an event needs to be passed when exporting the device +/// a pointer to an event needs to be passed when exporting the device /// array. It's the responsibility of the release function for the array /// to release the event. using ReleaseEventFunc = void (*)(void*); @@ -189,13 +190,14 @@ using ReleaseEventFunc = void (*)(void*); /// non-null. If the sync_event is null, then the sync_release parameter is ignored. /// /// \param[in] array Array object to export -/// \param[in] sync_event A pointer to an event-like object if necessary for synchronization, otherwise null. -/// \param[in] sync_release Function pointer to release the sync event -/// \param[out] out C struct to export the array to -/// \param[out] out_schema optional C struct to export the array type to +/// \param[in] sync_event A pointer to an event-like object if necessary for +/// synchronization, otherwise null. \param[in] sync_release Function pointer to release +/// the sync event \param[out] out C struct to export the array to \param[out] out_schema +/// optional C struct to export the array type to ARROW_EXPORT -Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, - struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); +Status ExportDeviceArray(const Array& array, void* sync_event, + ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema = NULLPTR); /// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray. /// @@ -211,20 +213,22 @@ Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc /// non-null. If the sync_event is null, then the sync_release parameter is ignored. /// /// \param[in] batch Record batch to export -/// \param[in] sync_event A pointer to an event-like object if necessary for synchronization, otherwise null. -/// \param[in] sync_release Function pointer to release the sync event -/// \param[out] out C struct where to export the record batch -/// \param[out] out_schema optional C struct where to export the record batch schema +/// \param[in] sync_event A pointer to an event-like object if necessary for +/// synchronization, otherwise null. \param[in] sync_release Function pointer to release +/// the sync event \param[out] out C struct where to export the record batch \param[out] +/// out_schema optional C struct where to export the record batch schema ARROW_EXPORT -Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, ReleaseEventFunc sync_release, - struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); - +Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, + ReleaseEventFunc sync_release, + struct ArrowDeviceArray* out, + struct ArrowSchema* out_schema = NULLPTR); class ARROW_EXPORT DeviceMemoryMgr { public: virtual ~DeviceMemoryMgr() = default; - virtual Result> get_manager(ArrowDeviceType device_type, int64_t device_id) const = 0; + virtual Result> get_manager(ArrowDeviceType device_type, + int64_t device_id) const = 0; }; /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. @@ -235,8 +239,8 @@ class ARROW_EXPORT DeviceMemoryMgr { /// /// \param[in,out] array C data interface struct holding the array data /// \param[in] type type of the imported array -/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager -/// \return Imported array object +/// \param[in] mapper An object with a `get_manager` method to map a device type to a +/// memory manager \return Imported array object ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, @@ -251,14 +255,15 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the array data /// \param[in,out] type C data interface struct holding the array type -/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager -/// \return Imported array object +/// \param[in] mapper An object with a `get_manager` method to map a device type to a +/// memory manager \return Imported array object ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, const DeviceMemoryMgr& mapper); -/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data interface. +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data +/// interface. /// /// The ArrowArray struct has its contents moved (as per the C data interface /// specification) to a private object held alive by the resulting record batch. @@ -267,14 +272,15 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in] schema schema of the imported record batch -/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager -/// \return Imported record batch object +/// \param[in] mapper An object with a `get_manager` method to map a device type to a +/// memory manager \return Imported record batch object ARROW_EXPORT -Result> ImportDeviceRecordBatch(struct ArrowArray* array, - std::shared_ptr schema, - const DeviceMemoryMgr& mapper); +Result> ImportDeviceRecordBatch( + struct ArrowArray* array, std::shared_ptr schema, + const DeviceMemoryMgr& mapper); -/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema from the C data interface. +/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema +/// from the C data interface. /// /// The type represented by the ArrowSchema struct must be a struct type array. /// The ArrowArray struct has its contents moved (as per the C data interface @@ -285,13 +291,11 @@ Result> ImportDeviceRecordBatch(struct ArrowArray* /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in,out] schema C data interface struct holding the record batch schema -/// \param[in] mapper An object with a `get_manager` method to map a device type to a memory manager -/// \return Imported record batch object +/// \param[in] mapper An object with a `get_manager` method to map a device type to a +/// memory manager \return Imported record batch object ARROW_EXPORT -Result> ImportDeviceRecordBatch(struct ArrowArray* array, - struct ArrowSchema* schema, - const DeviceMemoryMgr& mapper); - +Result> ImportDeviceRecordBatch( + struct ArrowArray* array, struct ArrowSchema* schema, const DeviceMemoryMgr& mapper); /// @} From 30256c169b8bef6b63ffd4d3575d2cdc1521c00b Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 5 Jul 2023 14:53:45 -0400 Subject: [PATCH 03/16] Linting --- cpp/src/arrow/buffer.h | 10 ++++++++-- cpp/src/arrow/device.h | 2 +- cpp/src/arrow/gpu/cuda_context.cc | 2 +- cpp/src/arrow/gpu/cuda_memory.cc | 4 ++-- cpp/src/arrow/gpu/cuda_memory.h | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 4fc9d94c18229..9d7eb766c856b 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -57,12 +57,18 @@ class ARROW_EXPORT Buffer { /// /// \note The passed memory must be kept alive through some other means Buffer(const uint8_t* data, int64_t size) - : is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size), device_type_(DeviceType::CPU) { + : is_mutable_(false), + is_cpu_(true), + data_(data), + size_(size), + capacity_(size), + device_type_(DeviceType::CPU) { SetMemoryManager(default_cpu_memory_manager()); } Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, - std::shared_ptr parent = NULLPTR, DeviceType device_type = DeviceType::UNKNOWN) + std::shared_ptr parent = NULLPTR, + DeviceType device_type = DeviceType::UNKNOWN) : is_mutable_(false), data_(data), size_(size), diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index f12ed0b89397e..9bba7ec4fdf7f 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -202,7 +202,7 @@ class ARROW_EXPORT CPUDevice : public Device { bool Equals(const Device&) const override; DeviceType device_type() const override { return DeviceType::CPU; } - std::shared_ptr default_memory_manager() override; + std::shared_ptr default_memory_manager() override; /// \brief Return the global CPUDevice instance static std::shared_ptr Instance(); diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 423cfb04af918..5bcf6e4f5a886 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -521,7 +521,7 @@ Result> CudaDeviceManager::GetSharedContext( Result> CudaDeviceManager::AllocateHost(int device_number, int64_t nbytes) { uint8_t* data = nullptr; - RETURN_NOT_OK(impl_->AllocateHost(device_number, nbytes, &data)); + RETURN_NOT_OK(impl_->AllocateHost(device_number, nbytes, &data)); return std::make_shared(data, nbytes); } diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 91020f085d459..e3ec08ea24de0 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -199,8 +199,8 @@ Result> CudaBuffer::ExportForIpc() { } CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size) - : MutableBuffer(data, size) { - device_type_ = DeviceType::CUDA_HOST; + : MutableBuffer(data, size) { + device_type_ = DeviceType::CUDA_HOST; } CudaHostBuffer::~CudaHostBuffer() { diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 0b92c020e8b83..2ec98db209c73 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -111,7 +111,7 @@ class ARROW_EXPORT CudaBuffer : public Buffer { class ARROW_EXPORT CudaHostBuffer : public MutableBuffer { public: CudaHostBuffer(uint8_t* data, const int64_t size); - + ~CudaHostBuffer(); /// \brief Return a device address the GPU can read this memory from. From d59120822f9e3bb2459a3209f455824f813f5606 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 6 Jul 2023 11:13:23 -0400 Subject: [PATCH 04/16] changes from review comments --- cpp/src/arrow/c/bridge.cc | 24 +++++++++++++----------- cpp/src/arrow/c/bridge.h | 9 +++++---- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 7618e3320491d..525d209d57f46 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -522,8 +522,8 @@ struct ExportedArrayPrivateData : PoolAllocationMixin std::shared_ptr data_; - ReleaseEventFunc sync_release_; - void* sync_event_; + ReleaseEventFunc sync_release_ = nullptr; + void* sync_event_ = nullptr; ExportedArrayPrivateData() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData); @@ -675,7 +675,7 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Result> validate_device_info(const ArrayData& data) { +Result> ValidateDeviceInfo(const ArrayData& data) { DeviceType device_type = DeviceType::UNKNOWN; int64_t device_id = -1; @@ -694,22 +694,22 @@ Result> validate_device_info(const ArrayData& dat if (buf->device_type() != device_type) { return Status::Invalid( - "exporting device array with buffers on more than one device."); + "Exporting device array with buffers on more than one device."); } if (buf->device()->device_id() != device_id) { return Status::Invalid( - "exporting device array with buffers on multiple device ids."); + "Exporting device array with buffers on multiple device ids."); } } // recursively check the children auto info = std::make_pair(device_type, device_id); for (const auto& child : data.child_data) { - ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*child)); + ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*child)); if (info != device_info) { return Status::Invalid( - "exporting device array with buffers on more than one device."); + "Exporting device array with buffers on more than one device."); } } @@ -721,7 +721,7 @@ Status ExportDeviceArray(const Array& array, void* sync_event, struct ArrowSchema* out_schema) { if (sync_event != nullptr && sync_release == nullptr) { return Status::Invalid( - "must provide a release event function if providing a non-null event"); + "Must provide a release event function if providing a non-null event"); } SchemaExportGuard guard(out_schema); @@ -729,7 +729,7 @@ Status ExportDeviceArray(const Array& array, void* sync_event, RETURN_NOT_OK(ExportType(*array.type(), out_schema)); } - ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*array.data())); + ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data())); out->device_type = static_cast(device_info.first); out->device_id = device_info.second; @@ -752,7 +752,7 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, struct ArrowSchema* out_schema) { if (sync_event != nullptr && sync_release == nullptr) { return Status::Invalid( - "must provide a release event function if providing a non-null event"); + "Must provide a release event function if providing a non-null event"); } // XXX perhaps bypass ToStructArray for speed? @@ -764,7 +764,7 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema)); } - ARROW_ASSIGN_OR_RAISE(auto device_info, validate_device_info(*array->data())); + ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data())); out->device_type = static_cast(device_info.first); out->device_id = device_info.second; @@ -1408,7 +1408,9 @@ struct ArrayImporter { device_type_ = static_cast(src->device_type); RETURN_NOT_OK(Import(&src->array)); import_->sync_event_ = src->sync_event; + // reset internal state before next import memory_mgr_.reset(); + device_type_ = DeviceType::CPU; return Status::OK(); } diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 4161a148cb17e..f71b7af764d59 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -187,13 +187,14 @@ using ReleaseEventFunc = void (*)(void*); /// will be returned. /// /// If a non-null sync_event is provided, then the sync_release func must also be -/// non-null. If the sync_event is null, then the sync_release parameter is ignored. +/// non-null. If the sync_event is null, then the sync_release parameter is not called. /// /// \param[in] array Array object to export /// \param[in] sync_event A pointer to an event-like object if necessary for -/// synchronization, otherwise null. \param[in] sync_release Function pointer to release -/// the sync event \param[out] out C struct to export the array to \param[out] out_schema -/// optional C struct to export the array type to +/// synchronization, otherwise null. +/// \param[in] sync_release Function pointer to release the sync event +/// \param[out] out C struct to export the array to +/// \param[out] out_schema optional C struct to export the array type to ARROW_EXPORT Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, From 58c15b04176351d1efd8b8b9a69dfb053bb4b7be Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 10 Jul 2023 11:06:33 -0400 Subject: [PATCH 05/16] update from feedback --- cpp/src/arrow/c/bridge.cc | 37 +++++++++++++++++-------------------- cpp/src/arrow/c/bridge.h | 6 +++--- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 525d209d57f46..bc371df548633 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -675,47 +675,44 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Result> ValidateDeviceInfo(const ArrayData& data) { - DeviceType device_type = DeviceType::UNKNOWN; - int64_t device_id = -1; - +Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, int64_t* device_id) { for (const auto& buf : data.buffers) { if (!buf) { - // some buffers might be null - // for example, the null bitmap is optional if there's no nulls continue; } - if (device_type == DeviceType::UNKNOWN) { - device_type = buf->device_type(); - device_id = buf->device()->device_id(); + if (*device_type == DeviceType::UNKNOWN) { + *device_type = buf->device_type(); + *device_id = buf->device()->device_id(); continue; } - if (buf->device_type() != device_type) { + if (buf->device_type() != *device_type) { return Status::Invalid( "Exporting device array with buffers on more than one device."); } - if (buf->device()->device_id() != device_id) { + if (buf->device()->device_id() != *device_id) { return Status::Invalid( "Exporting device array with buffers on multiple device ids."); - } + } } - // recursively check the children - auto info = std::make_pair(device_type, device_id); for (const auto& child : data.child_data) { - ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*child)); - if (info != device_info) { - return Status::Invalid( - "Exporting device array with buffers on more than one device."); - } + RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id)); } - return info; + return Status::OK(); } +Result> ValidateDeviceInfo(const ArrayData& data) { + DeviceType device_type = DeviceType::UNKNOWN; + int64_t device_id = -1; + RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id)); + return std::make_pair(device_type, device_id); +} + + Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index f71b7af764d59..45c0070596137 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -191,9 +191,9 @@ using ReleaseEventFunc = void (*)(void*); /// /// \param[in] array Array object to export /// \param[in] sync_event A pointer to an event-like object if necessary for -/// synchronization, otherwise null. -/// \param[in] sync_release Function pointer to release the sync event -/// \param[out] out C struct to export the array to +/// synchronization, otherwise null. +/// \param[in] sync_release Function pointer to release the sync event +/// \param[out] out C struct to export the array to /// \param[out] out_schema optional C struct to export the array type to ARROW_EXPORT Status ExportDeviceArray(const Array& array, void* sync_event, From 2d3bcb6e4b1f571ce6ac322a5a60283bfdfdcb3e Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 10 Jul 2023 11:20:30 -0400 Subject: [PATCH 06/16] linting --- cpp/src/arrow/c/bridge.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index bc371df548633..798dd4ba586f0 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -675,7 +675,8 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, int64_t* device_id) { +Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, + int64_t* device_id) { for (const auto& buf : data.buffers) { if (!buf) { continue; @@ -695,7 +696,7 @@ Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, int64_ if (buf->device()->device_id() != *device_id) { return Status::Invalid( "Exporting device array with buffers on multiple device ids."); - } + } } for (const auto& child : data.child_data) { @@ -712,7 +713,6 @@ Result> ValidateDeviceInfo(const ArrayData& data) return std::make_pair(device_type, device_id); } - Status ExportDeviceArray(const Array& array, void* sync_event, ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { From ff2dd1932a31b82f77ca7d893ee9f63f4c2ba3e5 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 12 Jul 2023 13:56:40 -0400 Subject: [PATCH 07/16] changes based on feedback --- cpp/src/arrow/buffer.h | 11 +++--- cpp/src/arrow/buffer_test.cc | 14 +++---- cpp/src/arrow/c/bridge.cc | 64 ++++++++++++++++--------------- cpp/src/arrow/c/bridge.h | 45 +++++++++++----------- cpp/src/arrow/device.h | 33 ++++++++-------- cpp/src/arrow/gpu/cuda_context.cc | 2 +- cpp/src/arrow/gpu/cuda_context.h | 2 +- cpp/src/arrow/gpu/cuda_memory.cc | 19 ++++++++- cpp/src/arrow/gpu/cuda_memory.h | 5 +++ cpp/src/arrow/gpu/cuda_test.cc | 8 ++-- 10 files changed, 114 insertions(+), 89 deletions(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 9d7eb766c856b..01629cfaa158a 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -62,13 +63,13 @@ class ARROW_EXPORT Buffer { data_(data), size_(size), capacity_(size), - device_type_(DeviceType::CPU) { + device_type_(DeviceType::kCPU) { SetMemoryManager(default_cpu_memory_manager()); } Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, std::shared_ptr parent = NULLPTR, - DeviceType device_type = DeviceType::UNKNOWN) + std::optional device_type = std::nullopt) : is_mutable_(false), data_(data), size_(size), @@ -79,7 +80,7 @@ class ARROW_EXPORT Buffer { // if a device type is specified, use that instead. for example: // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA, // but you can specify CUDA_HOST as the device type to override it. - if (device_type != DeviceType::UNKNOWN) { + if (device_type != std::nullopt) { device_type_ = device_type; } } @@ -253,7 +254,7 @@ class ARROW_EXPORT Buffer { const std::shared_ptr& memory_manager() const { return memory_manager_; } - DeviceType device_type() const { return device_type_; } + std::optional device_type() const { return device_type_; } std::shared_ptr parent() const { return parent_; } @@ -309,7 +310,7 @@ class ARROW_EXPORT Buffer { const uint8_t* data_; int64_t size_; int64_t capacity_; - DeviceType device_type_; + std::optional device_type_; // null by default, but may be set std::shared_ptr parent_; diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index ffae615ae245a..3520a862ae68f 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -41,7 +41,7 @@ using internal::checked_cast; using internal::checked_pointer_cast; static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; -static const DeviceType kMyDeviceType = DeviceType::EXT_DEV; +static const DeviceType kMyDeviceType = DeviceType::kEXT_DEV; static const int kMyDeviceAllowCopy = 1; static const int kMyDeviceAllowView = 2; @@ -259,7 +259,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -267,7 +267,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -297,7 +297,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -305,7 +305,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -341,7 +341,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -361,7 +361,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::CPU); + ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 798dd4ba586f0..679168143b863 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -522,8 +522,7 @@ struct ExportedArrayPrivateData : PoolAllocationMixin std::shared_ptr data_; - ReleaseEventFunc sync_release_ = nullptr; - void* sync_event_ = nullptr; + RawSyncEvent sync_event_; ExportedArrayPrivateData() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData); @@ -548,8 +547,8 @@ void ReleaseExportedArray(struct ArrowArray* array) { } DCHECK_NE(array->private_data, nullptr); auto* pdata = reinterpret_cast(array->private_data); - if (pdata->sync_event_ != nullptr && pdata->sync_release_ != nullptr) { - pdata->sync_release_(pdata->sync_event_); + if (pdata->sync_event_.sync_event != nullptr && pdata->sync_event_.release_func != nullptr) { + pdata->sync_event_.release_func(pdata->sync_event_.sync_event); } delete pdata; @@ -591,8 +590,7 @@ struct ArrayExporter { // Store owning pointer to ArrayData export_.data_ = data; - export_.sync_event_ = nullptr; - export_.sync_release_ = nullptr; + export_.sync_event_ = RawSyncEvent(); return Status::OK(); } @@ -675,14 +673,14 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, +Status ValidateDeviceInfo(const ArrayData& data, std::optional* device_type, int64_t* device_id) { for (const auto& buf : data.buffers) { if (!buf) { continue; } - if (*device_type == DeviceType::UNKNOWN) { + if (*device_type == std::nullopt) { *device_type = buf->device_type(); *device_id = buf->device()->device_id(); continue; @@ -706,17 +704,17 @@ Status ValidateDeviceInfo(const ArrayData& data, DeviceType* device_type, return Status::OK(); } -Result> ValidateDeviceInfo(const ArrayData& data) { - DeviceType device_type = DeviceType::UNKNOWN; +Result, int64_t>> ValidateDeviceInfo(const ArrayData& data) { + std::optional device_type; int64_t device_id = -1; RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id)); return std::make_pair(device_type, device_id); } -Status ExportDeviceArray(const Array& array, void* sync_event, - ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { - if (sync_event != nullptr && sync_release == nullptr) { + if (sync_event.sync_event != nullptr && sync_event.release_func) { return Status::Invalid( "Must provide a release event function if providing a non-null event"); } @@ -727,7 +725,11 @@ Status ExportDeviceArray(const Array& array, void* sync_event, } ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data())); - out->device_type = static_cast(device_info.first); + if (!device_info.first) { + out->device_type = ARROW_DEVICE_CPU; + } else { + out->device_type = static_cast(*device_info.first); + } out->device_id = device_info.second; ArrayExporter exporter; @@ -736,18 +738,17 @@ Status ExportDeviceArray(const Array& array, void* sync_event, auto* pdata = reinterpret_cast(out->array.private_data); pdata->sync_event_ = sync_event; - pdata->sync_release_ = sync_release; - out->sync_event = sync_event; + out->sync_event = sync_event.sync_event; guard.Detach(); return Status::OK(); } -Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, - ReleaseEventFunc sync_release, +Status ExportDeviceRecordBatch(const RecordBatch& batch, + RawSyncEvent sync_event, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { - if (sync_event != nullptr && sync_release == nullptr) { + if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) { return Status::Invalid( "Must provide a release event function if providing a non-null event"); } @@ -762,7 +763,11 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, } ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data())); - out->device_type = static_cast(device_info.first); + if (!device_info.first) { + out->device_type = ARROW_DEVICE_CPU; + } else { + out->device_type = static_cast(*device_info.first); + } out->device_id = device_info.second; ArrayExporter exporter; @@ -771,8 +776,7 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, auto* pdata = reinterpret_cast(out->array.private_data); pdata->sync_event_ = sync_event; - pdata->sync_release_ = sync_release; - out->sync_event = sync_event; + out->sync_event = sync_event.sync_event; guard.Detach(); return Status::OK(); @@ -1397,17 +1401,17 @@ struct ArrayImporter { explicit ArrayImporter(const std::shared_ptr& type) : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), - device_type_(DeviceType::CPU) {} + device_type_(DeviceType::kCPU) {} - Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMgr& mapper) { + Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) { ARROW_ASSIGN_OR_RAISE(memory_mgr_, - mapper.get_manager(src->device_type, src->device_id)); + mapper(src->device_type, src->device_id)); device_type_ = static_cast(src->device_type); RETURN_NOT_OK(Import(&src->array)); import_->sync_event_ = src->sync_event; // reset internal state before next import memory_mgr_.reset(); - device_type_ = DeviceType::CPU; + device_type_ = DeviceType::kCPU; return Status::OK(); } @@ -1796,7 +1800,7 @@ Result> ImportRecordBatch(struct ArrowArray* array, Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, - const DeviceMemoryMgr& mapper) { + const DeviceMemoryMapper& mapper) { ArrayImporter importer(type); RETURN_NOT_OK(importer.Import(array, mapper)); return importer.MakeArray(); @@ -1804,7 +1808,7 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, - const DeviceMemoryMgr& mapper) { + const DeviceMemoryMapper& mapper) { auto maybe_type = ImportType(type); if (!maybe_type.ok()) { ArrowArrayRelease(&array->array); @@ -1815,7 +1819,7 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, - const DeviceMemoryMgr& mapper) { + const DeviceMemoryMapper& mapper) { auto type = struct_(schema->fields()); ArrayImporter importer(type); RETURN_NOT_OK(importer.Import(array, mapper)); @@ -1824,7 +1828,7 @@ Result> ImportDeviceRecordBatch( Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, struct ArrowSchema* schema, - const DeviceMemoryMgr& mapper) { + const DeviceMemoryMapper& mapper) { auto maybe_schema = ImportSchema(schema); if (!maybe_schema.ok()) { ArrowArrayRelease(&array->array); diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 45c0070596137..e273d6890f1cf 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -176,8 +177,11 @@ Result> ImportRecordBatch(struct ArrowArray* array, /// If synchronization is necessary for accessing the data on a device, /// a pointer to an event needs to be passed when exporting the device /// array. It's the responsibility of the release function for the array -/// to release the event. -using ReleaseEventFunc = void (*)(void*); +/// to release the event. Both can be null if no sync'ing is necessary. +struct RawSyncEvent { + void* sync_event = nullptr; + std::function release_func; +}; /// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray. /// @@ -190,14 +194,12 @@ using ReleaseEventFunc = void (*)(void*); /// non-null. If the sync_event is null, then the sync_release parameter is not called. /// /// \param[in] array Array object to export -/// \param[in] sync_event A pointer to an event-like object if necessary for -/// synchronization, otherwise null. -/// \param[in] sync_release Function pointer to release the sync event +/// \param[in] sync_event A struct containing what is needed for syncing if necessary /// \param[out] out C struct to export the array to /// \param[out] out_schema optional C struct to export the array type to ARROW_EXPORT -Status ExportDeviceArray(const Array& array, void* sync_event, - ReleaseEventFunc sync_release, struct ArrowDeviceArray* out, +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); /// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray. @@ -214,23 +216,20 @@ Status ExportDeviceArray(const Array& array, void* sync_event, /// non-null. If the sync_event is null, then the sync_release parameter is ignored. /// /// \param[in] batch Record batch to export -/// \param[in] sync_event A pointer to an event-like object if necessary for -/// synchronization, otherwise null. \param[in] sync_release Function pointer to release -/// the sync event \param[out] out C struct where to export the record batch \param[out] -/// out_schema optional C struct where to export the record batch schema +/// \param[in] sync_event A struct containing what is needed for syncing if necessary +/// \param[out] out C struct where to export the record batch +/// \param[out] out_schema optional C struct where to export the record batch schema ARROW_EXPORT -Status ExportDeviceRecordBatch(const RecordBatch& batch, void* sync_event, - ReleaseEventFunc sync_release, +Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); -class ARROW_EXPORT DeviceMemoryMgr { - public: - virtual ~DeviceMemoryMgr() = default; +using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; - virtual Result> get_manager(ArrowDeviceType device_type, - int64_t device_id) const = 0; -}; + +// ARROW_EXPORT +// Result> DefaultMemoryMapper(ArrowDeviceType device_type, +// int64_t device_id); /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// @@ -245,7 +244,7 @@ class ARROW_EXPORT DeviceMemoryMgr { ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, - const DeviceMemoryMgr& mapper); + const DeviceMemoryMapper& mapper); /// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. /// @@ -261,7 +260,7 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, - const DeviceMemoryMgr& mapper); + const DeviceMemoryMapper& mapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. @@ -278,7 +277,7 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowArray* array, std::shared_ptr schema, - const DeviceMemoryMgr& mapper); + const DeviceMemoryMapper& mapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema /// from the C data interface. @@ -296,7 +295,7 @@ Result> ImportDeviceRecordBatch( /// memory manager \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( - struct ArrowArray* array, struct ArrowSchema* schema, const DeviceMemoryMgr& mapper); + struct ArrowArray* array, struct ArrowSchema* schema, const DeviceMemoryMapper& mapper); /// @} diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 9bba7ec4fdf7f..83145c81370f6 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -30,22 +30,21 @@ namespace arrow { /// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device types -enum class DeviceType : char { - UNKNOWN = 0, - CPU = 1, - CUDA = 2, - CUDA_HOST = 3, - OPENCL = 4, - VULKAN = 7, - METAL = 8, - VPI = 9, - ROCM = 10, - ROCM_HOST = 11, - EXT_DEV = 12, - CUDA_MANAGED = 13, - ONEAPI = 14, - WEBGPU = 15, - HEXAGON = 16, +enum class DeviceType : char { + kCPU = 1, + kCUDA = 2, + kCUDA_HOST = 3, + kOPENCL = 4, + kVULKAN = 7, + kMETAL = 8, + kVPI = 9, + kROCM = 10, + kROCM_HOST = 11, + kEXT_DEV = 12, + kCUDA_MANAGED = 13, + kONEAPI = 14, + kWEBGPU = 15, + kHEXAGON = 16, }; class MemoryManager; @@ -200,7 +199,7 @@ class ARROW_EXPORT CPUDevice : public Device { const char* type_name() const override; std::string ToString() const override; bool Equals(const Device&) const override; - DeviceType device_type() const override { return DeviceType::CPU; } + DeviceType device_type() const override { return DeviceType::kCPU; } std::shared_ptr default_memory_manager() override; diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 5bcf6e4f5a886..027b1a5bec129 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -384,7 +384,7 @@ Result> CudaMemoryManager::ViewBufferTo( if (to->is_cpu()) { // Device-on-CPU view ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address())); - return std::make_shared(address, buf->size(), to, buf, DeviceType::CUDA_HOST); + return std::make_shared(address, buf->size(), to, buf, DeviceType::kCUDA_HOST); } return nullptr; } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index e00687e7c6818..ab7f414136464 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -92,7 +92,7 @@ class ARROW_EXPORT CudaDevice : public Device { std::string ToString() const override; bool Equals(const Device&) const override; std::shared_ptr default_memory_manager() override; - DeviceType device_type() const override { return DeviceType::CUDA; } + DeviceType device_type() const override { return DeviceType::kCUDA; } int64_t device_id() const override { return device_number(); } /// \brief Return a CudaDevice instance for a particular device diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index e3ec08ea24de0..7aa7cedc06492 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -200,7 +200,7 @@ Result> CudaBuffer::ExportForIpc() { CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size) : MutableBuffer(data, size) { - device_type_ = DeviceType::CUDA_HOST; + device_type_ = DeviceType::kCUDA_HOST; } CudaHostBuffer::~CudaHostBuffer() { @@ -485,5 +485,22 @@ Result GetHostAddress(uintptr_t device_ptr) { return static_cast(ptr); } +Result> DefaultMemoryMapper(ArrowDeviceType device_type, + int64_t device_id) { + switch (device_type) { + case ARROW_DEVICE_CPU: + return default_cpu_memory_manager(); + case ARROW_DEVICE_CUDA: + case ARROW_DEVICE_CUDA_HOST: + case ARROW_DEVICE_CUDA_MANAGED: + { + ARROW_ASSIGN_OR_RAISE(auto device, arrow::cuda::CudaDevice::Make(device_id)); + return device->default_memory_manager(); + } + default: + return Status::NotImplemented("memory manager not implemented for device"); + } +} + } // namespace cuda } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 2ec98db209c73..d323bef03494e 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -21,6 +21,7 @@ #include #include "arrow/buffer.h" +#include "arrow/c/abi.h" #include "arrow/io/concurrency.h" #include "arrow/type_fwd.h" @@ -259,5 +260,9 @@ Result GetDeviceAddress(const uint8_t* cpu_data, ARROW_EXPORT Result GetHostAddress(uintptr_t device_ptr); +ARROW_EXPORT +Result> DefaultMemoryMapper(ArrowDeviceType device_type, + int64_t device_id); + } // namespace cuda } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index 19d6366c555ce..62ffb60cacfa3 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -364,7 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); - ASSERT_EQ(host_buffer->device_type(), DeviceType::CUDA_HOST); + ASSERT_EQ(host_buffer->device_type(), DeviceType::kCUDA_HOST); ASSERT_OK_AND_ASSIGN(auto device_address, host_buffer->GetDeviceAddress(context_)); ASSERT_NE(device_address, 0); @@ -377,7 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); - ASSERT_EQ(host_buffer->device_type(), DeviceType::CUDA_HOST); + ASSERT_EQ(host_buffer->device_type(), DeviceType::kCUDA_HOST); // Try to view the host buffer on the device. This should correspond to // GetDeviceAddress() in the previous test. @@ -387,7 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_NE(device_buffer->address(), 0); ASSERT_EQ(device_buffer->size(), host_buffer->size()); ASSERT_EQ(device_buffer->parent(), host_buffer); - ASSERT_EQ(device_buffer->device_type(), DeviceType::CUDA); + ASSERT_EQ(device_buffer->device_type(), DeviceType::kCUDA); // View back the device buffer on the CPU. This should roundtrip. ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_)); @@ -396,7 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_EQ(buffer->address(), host_buffer->address()); ASSERT_EQ(buffer->size(), host_buffer->size()); ASSERT_EQ(buffer->parent(), device_buffer); - ASSERT_EQ(buffer->device_type(), DeviceType::CUDA_HOST); + ASSERT_EQ(buffer->device_type(), DeviceType::kCUDA_HOST); } // ------------------------------------------------------------------------ From a858ccba2f5fda1e4425501ec5abbc677e0fe4ef Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 17 Jul 2023 12:17:38 -0400 Subject: [PATCH 08/16] addressing feedback --- cpp/src/arrow/buffer.h | 8 ++++---- cpp/src/arrow/buffer_test.cc | 16 ++++++++-------- cpp/src/arrow/c/bridge.cc | 31 +++++++++++++++---------------- cpp/src/arrow/c/bridge.h | 29 +++++++++++++++-------------- cpp/src/arrow/device.h | 8 ++++---- cpp/src/arrow/gpu/cuda_context.cc | 2 +- cpp/src/arrow/gpu/cuda_context.h | 2 +- cpp/src/arrow/gpu/cuda_memory.cc | 2 +- cpp/src/arrow/gpu/cuda_test.cc | 8 ++++---- 9 files changed, 53 insertions(+), 53 deletions(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 01629cfaa158a..55c8f820948e0 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -63,13 +63,13 @@ class ARROW_EXPORT Buffer { data_(data), size_(size), capacity_(size), - device_type_(DeviceType::kCPU) { + device_type_(DeviceAllocationType::kCPU) { SetMemoryManager(default_cpu_memory_manager()); } Buffer(const uint8_t* data, int64_t size, std::shared_ptr mm, std::shared_ptr parent = NULLPTR, - std::optional device_type = std::nullopt) + std::optional device_type = std::nullopt) : is_mutable_(false), data_(data), size_(size), @@ -254,7 +254,7 @@ class ARROW_EXPORT Buffer { const std::shared_ptr& memory_manager() const { return memory_manager_; } - std::optional device_type() const { return device_type_; } + std::optional device_type() const { return device_type_; } std::shared_ptr parent() const { return parent_; } @@ -310,7 +310,7 @@ class ARROW_EXPORT Buffer { const uint8_t* data_; int64_t size_; int64_t capacity_; - std::optional device_type_; + std::optional device_type_; // null by default, but may be set std::shared_ptr parent_; diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index 3520a862ae68f..3dd95cb8af5c6 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -41,7 +41,7 @@ using internal::checked_cast; using internal::checked_pointer_cast; static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; -static const DeviceType kMyDeviceType = DeviceType::kEXT_DEV; +static const DeviceAllocationType kMyDeviceType = DeviceAllocationType::kEXT_DEV; static const int kMyDeviceAllowCopy = 1; static const int kMyDeviceAllowView = 2; @@ -71,7 +71,7 @@ class MyDevice : public Device { return checked_cast(other).value_ == value_; } - DeviceType device_type() const override { return kMyDeviceType; } + DeviceAllocationType device_type() const override { return kMyDeviceType; } std::shared_ptr default_memory_manager() override; @@ -259,7 +259,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -267,7 +267,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -297,7 +297,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -305,7 +305,7 @@ TEST_F(TestDevice, Copy) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_NE(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -341,7 +341,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), cpu_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); @@ -361,7 +361,7 @@ TEST_F(TestDevice, View) { ASSERT_EQ(buffer->device(), cpu_device_); ASSERT_TRUE(buffer->is_cpu()); ASSERT_EQ(buffer->address(), my_copy_src_->address()); - ASSERT_EQ(buffer->device_type(), DeviceType::kCPU); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU); ASSERT_NE(buffer->data(), nullptr); AssertBufferEqual(*buffer, "some data"); diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 679168143b863..e8bacf53e233a 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -547,7 +547,8 @@ void ReleaseExportedArray(struct ArrowArray* array) { } DCHECK_NE(array->private_data, nullptr); auto* pdata = reinterpret_cast(array->private_data); - if (pdata->sync_event_.sync_event != nullptr && pdata->sync_event_.release_func != nullptr) { + if (pdata->sync_event_.sync_event != nullptr && + pdata->sync_event_.release_func != nullptr) { pdata->sync_event_.release_func(pdata->sync_event_.sync_event); } delete pdata; @@ -673,7 +674,7 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Status ValidateDeviceInfo(const ArrayData& data, std::optional* device_type, +Status ValidateDeviceInfo(const ArrayData& data, std::optional* device_type, int64_t* device_id) { for (const auto& buf : data.buffers) { if (!buf) { @@ -704,16 +705,16 @@ Status ValidateDeviceInfo(const ArrayData& data, std::optional* devi return Status::OK(); } -Result, int64_t>> ValidateDeviceInfo(const ArrayData& data) { - std::optional device_type; +Result, int64_t>> ValidateDeviceInfo( + const ArrayData& data) { + std::optional device_type; int64_t device_id = -1; RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id)); return std::make_pair(device_type, device_id); } -Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, - struct ArrowDeviceArray* out, - struct ArrowSchema* out_schema) { +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, + struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { if (sync_event.sync_event != nullptr && sync_event.release_func) { return Status::Invalid( "Must provide a release event function if providing a non-null event"); @@ -744,8 +745,7 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, return Status::OK(); } -Status ExportDeviceRecordBatch(const RecordBatch& batch, - RawSyncEvent sync_event, +Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) { if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) { @@ -1388,7 +1388,7 @@ class ImportedBuffer : public Buffer { : Buffer(data, size), import_(std::move(import)) {} ImportedBuffer(const uint8_t* data, int64_t size, std::shared_ptr mm, - DeviceType device_type, std::shared_ptr import) + DeviceAllocationType device_type, std::shared_ptr import) : Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {} ~ImportedBuffer() override {} @@ -1401,17 +1401,16 @@ struct ArrayImporter { explicit ArrayImporter(const std::shared_ptr& type) : type_(type), zero_size_buffer_(std::make_shared(kZeroSizeArea, 0)), - device_type_(DeviceType::kCPU) {} + device_type_(DeviceAllocationType::kCPU) {} Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& mapper) { - ARROW_ASSIGN_OR_RAISE(memory_mgr_, - mapper(src->device_type, src->device_id)); - device_type_ = static_cast(src->device_type); + ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id)); + device_type_ = static_cast(src->device_type); RETURN_NOT_OK(Import(&src->array)); import_->sync_event_ = src->sync_event; // reset internal state before next import memory_mgr_.reset(); - device_type_ = DeviceType::kCPU; + device_type_ = DeviceAllocationType::kCPU; return Status::OK(); } @@ -1758,7 +1757,7 @@ struct ArrayImporter { std::shared_ptr zero_size_buffer_; std::shared_ptr memory_mgr_; - DeviceType device_type_; + DeviceAllocationType device_type_; }; } // namespace diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index e273d6890f1cf..337f4446a5b8c 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -198,7 +198,7 @@ struct RawSyncEvent { /// \param[out] out C struct to export the array to /// \param[out] out_schema optional C struct to export the array type to ARROW_EXPORT -Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, +Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); @@ -216,16 +216,16 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event, /// non-null. If the sync_event is null, then the sync_release parameter is ignored. /// /// \param[in] batch Record batch to export -/// \param[in] sync_event A struct containing what is needed for syncing if necessary -/// \param[out] out C struct where to export the record batch +/// \param[in] sync_event A struct containing what is needed for syncing if necessary +/// \param[out] out C struct where to export the record batch /// \param[out] out_schema optional C struct where to export the record batch schema ARROW_EXPORT Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event, struct ArrowDeviceArray* out, struct ArrowSchema* out_schema = NULLPTR); -using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; - +using DeviceMemoryMapper = + std::function>(ArrowDeviceType, int64_t)>; // ARROW_EXPORT // Result> DefaultMemoryMapper(ArrowDeviceType device_type, @@ -239,8 +239,8 @@ using DeviceMemoryMapper = std::function>( /// /// \param[in,out] array C data interface struct holding the array data /// \param[in] type type of the imported array -/// \param[in] mapper An object with a `get_manager` method to map a device type to a -/// memory manager \return Imported array object +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported array object ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, @@ -255,8 +255,8 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the array data /// \param[in,out] type C data interface struct holding the array type -/// \param[in] mapper An object with a `get_manager` method to map a device type to a -/// memory manager \return Imported array object +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported array object ARROW_EXPORT Result> ImportDeviceArray(struct ArrowDeviceArray* array, struct ArrowSchema* type, @@ -272,8 +272,8 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in] schema schema of the imported record batch -/// \param[in] mapper An object with a `get_manager` method to map a device type to a -/// memory manager \return Imported record batch object +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowArray* array, std::shared_ptr schema, @@ -291,11 +291,12 @@ Result> ImportDeviceRecordBatch( /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in,out] schema C data interface struct holding the record batch schema -/// \param[in] mapper An object with a `get_manager` method to map a device type to a -/// memory manager \return Imported record batch object +/// \param[in] mapper A function to map device + id to memory manager +/// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( - struct ArrowArray* array, struct ArrowSchema* schema, const DeviceMemoryMapper& mapper); + struct ArrowArray* array, struct ArrowSchema* schema, + const DeviceMemoryMapper& mapper); /// @} diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 83145c81370f6..92598a6abe510 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -30,7 +30,7 @@ namespace arrow { /// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device types -enum class DeviceType : char { +enum class DeviceAllocationType : char { kCPU = 1, kCUDA = 2, kCUDA_HOST = 3, @@ -95,8 +95,8 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// MemoryManager instances with non-default parameters. virtual std::shared_ptr default_memory_manager() = 0; - /// \brief Return the DeviceType of this device - virtual DeviceType device_type() const = 0; + /// \brief Return the DeviceAllocationType of this device + virtual DeviceAllocationType device_type() const = 0; protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Device); @@ -199,7 +199,7 @@ class ARROW_EXPORT CPUDevice : public Device { const char* type_name() const override; std::string ToString() const override; bool Equals(const Device&) const override; - DeviceType device_type() const override { return DeviceType::kCPU; } + DeviceAllocationType device_type() const override { return DeviceAllocationType::kCPU; } std::shared_ptr default_memory_manager() override; diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 027b1a5bec129..ab224c8186e33 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -384,7 +384,7 @@ Result> CudaMemoryManager::ViewBufferTo( if (to->is_cpu()) { // Device-on-CPU view ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address())); - return std::make_shared(address, buf->size(), to, buf, DeviceType::kCUDA_HOST); + return std::make_shared(address, buf->size(), to, buf, DeviceAllocationType::kCUDA_HOST); } return nullptr; } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index ab7f414136464..ae07ef94ebb0a 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -92,7 +92,7 @@ class ARROW_EXPORT CudaDevice : public Device { std::string ToString() const override; bool Equals(const Device&) const override; std::shared_ptr default_memory_manager() override; - DeviceType device_type() const override { return DeviceType::kCUDA; } + DeviceAllocationType device_type() const override { return DeviceAllocationType::kCUDA; } int64_t device_id() const override { return device_number(); } /// \brief Return a CudaDevice instance for a particular device diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 7aa7cedc06492..af295ac00cc21 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -200,7 +200,7 @@ Result> CudaBuffer::ExportForIpc() { CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size) : MutableBuffer(data, size) { - device_type_ = DeviceType::kCUDA_HOST; + device_type_ = DeviceAllocationType::kCUDA_HOST; } CudaHostBuffer::~CudaHostBuffer() { diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index 62ffb60cacfa3..6d392213e231f 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -364,7 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); - ASSERT_EQ(host_buffer->device_type(), DeviceType::kCUDA_HOST); + ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST); ASSERT_OK_AND_ASSIGN(auto device_address, host_buffer->GetDeviceAddress(context_)); ASSERT_NE(device_address, 0); @@ -377,7 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_TRUE(host_buffer->is_cpu()); ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_); - ASSERT_EQ(host_buffer->device_type(), DeviceType::kCUDA_HOST); + ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST); // Try to view the host buffer on the device. This should correspond to // GetDeviceAddress() in the previous test. @@ -387,7 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_NE(device_buffer->address(), 0); ASSERT_EQ(device_buffer->size(), host_buffer->size()); ASSERT_EQ(device_buffer->parent(), host_buffer); - ASSERT_EQ(device_buffer->device_type(), DeviceType::kCUDA); + ASSERT_EQ(device_buffer->device_type(), DeviceAllocationType::kCUDA); // View back the device buffer on the CPU. This should roundtrip. ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_)); @@ -396,7 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) { ASSERT_EQ(buffer->address(), host_buffer->address()); ASSERT_EQ(buffer->size(), host_buffer->size()); ASSERT_EQ(buffer->parent(), device_buffer); - ASSERT_EQ(buffer->device_type(), DeviceType::kCUDA_HOST); + ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCUDA_HOST); } // ------------------------------------------------------------------------ From 6e7de4982dfd3ecc3379cee356fe4ac3f6516b4a Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 17 Jul 2023 13:01:46 -0400 Subject: [PATCH 09/16] clang-format stuff --- cpp/src/arrow/c/bridge.cc | 6 ++++-- cpp/src/arrow/device.h | 2 +- cpp/src/arrow/gpu/cuda_context.cc | 3 ++- cpp/src/arrow/gpu/cuda_context.h | 4 +++- cpp/src/arrow/gpu/cuda_memory.cc | 15 +++++++-------- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index e8bacf53e233a..13355dd6d05ae 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -674,7 +674,8 @@ Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out, ////////////////////////////////////////////////////////////////////////// // C device arrays -Status ValidateDeviceInfo(const ArrayData& data, std::optional* device_type, +Status ValidateDeviceInfo(const ArrayData& data, + std::optional* device_type, int64_t* device_id) { for (const auto& buf : data.buffers) { if (!buf) { @@ -1388,7 +1389,8 @@ class ImportedBuffer : public Buffer { : Buffer(data, size), import_(std::move(import)) {} ImportedBuffer(const uint8_t* data, int64_t size, std::shared_ptr mm, - DeviceAllocationType device_type, std::shared_ptr import) + DeviceAllocationType device_type, + std::shared_ptr import) : Buffer(data, size, mm, nullptr, device_type), import_(std::move(import)) {} ~ImportedBuffer() override {} diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 92598a6abe510..837681fab33db 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -30,7 +30,7 @@ namespace arrow { /// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device types -enum class DeviceAllocationType : char { +enum class DeviceAllocationType : char { kCPU = 1, kCUDA = 2, kCUDA_HOST = 3, diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index ab224c8186e33..869ea6453ccda 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -384,7 +384,8 @@ Result> CudaMemoryManager::ViewBufferTo( if (to->is_cpu()) { // Device-on-CPU view ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address())); - return std::make_shared(address, buf->size(), to, buf, DeviceAllocationType::kCUDA_HOST); + return std::make_shared(address, buf->size(), to, buf, + DeviceAllocationType::kCUDA_HOST); } return nullptr; } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index ae07ef94ebb0a..a1b95c7b4181d 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -92,7 +92,9 @@ class ARROW_EXPORT CudaDevice : public Device { std::string ToString() const override; bool Equals(const Device&) const override; std::shared_ptr default_memory_manager() override; - DeviceAllocationType device_type() const override { return DeviceAllocationType::kCUDA; } + DeviceAllocationType device_type() const override { + return DeviceAllocationType::kCUDA; + } int64_t device_id() const override { return device_number(); } /// \brief Return a CudaDevice instance for a particular device diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index af295ac00cc21..860c6311d7b2f 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -488,17 +488,16 @@ Result GetHostAddress(uintptr_t device_ptr) { Result> DefaultMemoryMapper(ArrowDeviceType device_type, int64_t device_id) { switch (device_type) { - case ARROW_DEVICE_CPU: - return default_cpu_memory_manager(); - case ARROW_DEVICE_CUDA: - case ARROW_DEVICE_CUDA_HOST: - case ARROW_DEVICE_CUDA_MANAGED: - { + case ARROW_DEVICE_CPU: + return default_cpu_memory_manager(); + case ARROW_DEVICE_CUDA: + case ARROW_DEVICE_CUDA_HOST: + case ARROW_DEVICE_CUDA_MANAGED: { ARROW_ASSIGN_OR_RAISE(auto device, arrow::cuda::CudaDevice::Make(device_id)); return device->default_memory_manager(); } - default: - return Status::NotImplemented("memory manager not implemented for device"); + default: + return Status::NotImplemented("memory manager not implemented for device"); } } From f0deaabd57ea639d38ba7355b9abdedbf993a729 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 18 Jul 2023 12:03:07 -0400 Subject: [PATCH 10/16] fix lint --- cpp/src/arrow/c/bridge.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 337f4446a5b8c..6a89d168951c2 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -179,7 +179,7 @@ Result> ImportRecordBatch(struct ArrowArray* array, /// array. It's the responsibility of the release function for the array /// to release the event. Both can be null if no sync'ing is necessary. struct RawSyncEvent { - void* sync_event = nullptr; + void* sync_event = NULL; std::function release_func; }; From 849457bd96cbacdbefc972385eb1e7924b2f1e15 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 19 Jul 2023 10:40:18 -0400 Subject: [PATCH 11/16] Update cpp/src/arrow/buffer.h Co-authored-by: Weston Pace --- cpp/src/arrow/buffer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 55c8f820948e0..427c63d5eedf5 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -75,7 +75,7 @@ class ARROW_EXPORT Buffer { size_(size), capacity_(size), parent_(std::move(parent)) { - // will set device_type from the memory manager + // SetMemoryManager will also set device_type_ SetMemoryManager(std::move(mm)); // if a device type is specified, use that instead. for example: // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA, From 9244ea4e83af9dddf1510837277dba9edf8c439e Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 20 Jul 2023 14:55:10 -0400 Subject: [PATCH 12/16] add tests --- cpp/src/arrow/c/bridge.h | 7 +- cpp/src/arrow/c/bridge_test.cc | 393 ++++++++++++++++++++++++++++++++- cpp/src/arrow/device.h | 4 +- 3 files changed, 396 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 6a89d168951c2..eaa88beaf976a 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -227,9 +227,6 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; -// ARROW_EXPORT -// Result> DefaultMemoryMapper(ArrowDeviceType device_type, -// int64_t device_id); /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// @@ -276,7 +273,7 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( - struct ArrowArray* array, std::shared_ptr schema, + struct ArrowDeviceArray* array, std::shared_ptr schema, const DeviceMemoryMapper& mapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema @@ -295,7 +292,7 @@ Result> ImportDeviceRecordBatch( /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( - struct ArrowArray* array, struct ArrowSchema* schema, + struct ArrowDeviceArray* array, struct ArrowSchema* schema, const DeviceMemoryMapper& mapper); /// @} diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 5fe7b653c8970..095990536f18b 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -565,6 +565,15 @@ struct ArrayExportChecker { ASSERT_EQ(c_export->children, nullptr); } } + + void operator()(struct ArrowDeviceArray* c_export, const ArrayData& expected_data, + const ArrowDeviceType device_type, const int64_t device_id, + const void* sync_event) { + ASSERT_EQ(c_export->device_type, device_type); + ASSERT_EQ(c_export->device_id, device_id); + ASSERT_EQ(c_export->sync_event, sync_event); + this->operator()(&c_export->array, expected_data); + } }; struct RecordBatchExportChecker { @@ -592,6 +601,15 @@ struct RecordBatchExportChecker { ASSERT_EQ(c_export->children, nullptr); } } + + void operator()(struct ArrowDeviceArray* c_export, const RecordBatch& expected_data, + const ArrowDeviceType device_type, const int64_t device_id, + const void* sync_event) { + ASSERT_EQ(c_export->device_type, device_type); + ASSERT_EQ(c_export->device_id, device_id); + ASSERT_EQ(c_export->sync_event, sync_event); + this->operator()(&c_export->array, expected_data); + } }; class TestArrayExport : public ::testing::Test { @@ -601,7 +619,7 @@ class TestArrayExport : public ::testing::Test { static std::function>()> JSONArrayFactory( std::shared_ptr type, const char* json) { return [=]() { return ArrayFromJSON(type, json); }; - } + } template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { @@ -1112,6 +1130,379 @@ TEST_F(TestArrayExport, ExportRecordBatch) { } } +//////////////////////////////////////////////////////////////////////////// +// Device Array Export Tests + +static const char kMyDeviceTypeName[] = "arrowtest::MyDevice"; +static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV; + +class MyBuffer final : public MutableBuffer { + public: + using MutableBuffer::MutableBuffer; + + ~MyBuffer() { + default_memory_pool()->Free(const_cast(data_), size_); + } +}; + +class MyMemoryManager : public CPUMemoryManager { + public: + explicit MyMemoryManager(const std::shared_ptr& device) + : CPUMemoryManager(device, default_memory_pool()) {} + + Result> AllocateBuffer(int64_t size) override { + uint8_t* data; + RETURN_NOT_OK(pool_->Allocate(size, &data)); + return std::make_unique(data, size, shared_from_this()); + } + + protected: + Result> CopyBufferFrom( + const std::shared_ptr& buf, const std::shared_ptr& from) override { + return CopyNonOwnedFrom(*buf, from); + } + Result> CopyNonOwnedFrom( + const Buffer& buf, const std::shared_ptr& from) override { + if (!from->is_cpu()) { + return nullptr; + } + + ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size())); + if (buf.size() > 0) { + memcpy(dest->mutable_data(), buf.data(), static_cast(buf.size())); + } + return std::move(dest); + } +}; + +class MyDevice : public Device { + public: + explicit MyDevice(int value) : Device(true), value_(value) {} + const char* type_name() const override { return kMyDeviceTypeName; } + std::string ToString() const override { return kMyDeviceTypeName; } + bool Equals(const Device& other) const override { + if (other.type_name() != kMyDeviceTypeName) { + return false; + } + return checked_cast(other).value_ == value_; + } + DeviceAllocationType device_type() const override { + return static_cast(kMyDeviceType); } + int64_t device_id() const { return value_; } + std::shared_ptr default_memory_manager() override { + return std::make_shared(shared_from_this()); + } + + protected: + int value_; +}; + +class TestDeviceArrayExport : public ::testing::Test { + public: + void SetUp() override { + pool_ = default_memory_pool(); + } + + static Result> ToDeviceData(const std::shared_ptr& mm, const ArrayData& data) { + arrow::BufferVector buffers; + for (const auto& buf : data.buffers) { + if (buf) { + ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm)); + buffers.push_back(dest); + } else { + buffers.push_back(nullptr); + } + } + + arrow::ArrayDataVector children; + for (const auto& child : data.child_data) { + ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child)); + children.push_back(dest); + } + + return ArrayData::Make(data.type, data.length, buffers, children, data.null_count, data.offset); + } + + static Result> ToDevice(const std::shared_ptr& mm, const ArrayData& data) { + ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data)); + return MakeArray(result); + } + + template + static std::function>()> ToDeviceFactory(const std::shared_ptr& mm, ArrayFactory&& factory) { + return [&]() { return ToDevice(mm, *factory()->data()); }; + } + + static std::function>()> JSONArrayFactory( + const std::shared_ptr& mm, std::shared_ptr type, const char* json) { + return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; + } + + template + void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { + auto orig_bytes = pool_->bytes_allocated(); + + std::shared_ptr arr; + ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); + ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(), + ", array data = ", arr->ToString()); + const ArrayData& data = *arr->data(); // non-owning reference + struct ArrowDeviceArray c_export; + ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export)); + + ArrayExportGuard guard(&c_export.array); + auto new_bytes = pool_->bytes_allocated(); + ASSERT_GT(new_bytes, orig_bytes); + + // Release the shared_ptr, underlying data should be held alive + arr.reset(); + ASSERT_EQ(pool_->bytes_allocated(), new_bytes); + check_func(&c_export, data, kMyDeviceType, 1, nullptr); + + // Release the ArrowArray, underlying data should be destroyed + guard.Release(); + ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); + } + + + template + void TestNested(ArrayFactory&& factory) { + ArrayExportChecker checker; + TestWithArrayFactory(std::forward(factory), checker); + } + + void TestNested(const std::shared_ptr& mm, const std::shared_ptr& type, const char* json) { + TestNested(JSONArrayFactory(mm, type, json)); + } + + template + void TestPrimitive(ArrayFactory&& factory) { + TestNested(std::forward(factory)); + } + + void TestPrimitive(const std::shared_ptr& mm, const std::shared_ptr& type, const char* json) { + TestNested(mm, type, json); + } + + protected: + MemoryPool* pool_; +}; + +TEST_F(TestDeviceArrayExport, Primitive) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestPrimitive(mm, int8(), "[1, 2, null, -3]"); + TestPrimitive(mm, int16(), "[1, 2, -3]"); + TestPrimitive(mm, int32(), "[1, 2, null, -3]"); + TestPrimitive(mm, int64(), "[1, 2, -3]"); + TestPrimitive(mm, uint8(), "[1, 2, 3]"); + TestPrimitive(mm, uint16(), "[1, 2, null, 3]"); + TestPrimitive(mm, uint32(), "[1, 2, 3]"); + TestPrimitive(mm, uint64(), "[1, 2, null, 3]"); + + TestPrimitive(mm, boolean(), "[true, false, null]"); + + TestPrimitive(mm, float32(), "[1.5, null]"); + TestPrimitive(mm, float64(), "[1.5, null]"); + + TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])"); + TestPrimitive(mm, binary(), R"(["foo", "bar", null])"); + TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])"); + TestPrimitive(mm, utf8(), R"(["foo", "bar", null])"); + TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])"); + + TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])"); + TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])"); + + TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])"); +} + +TEST_F(TestDeviceArrayExport, PrimitiveSliced) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + auto factory = [=]() { return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data()))->Slice(1, 2); }; + TestPrimitive(factory); +} + + +TEST_F(TestDeviceArrayExport, Temporal) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* json = "[1, 2, null, 42]"; + TestPrimitive(mm, date32(), json); + TestPrimitive(mm, date64(), json); + TestPrimitive(mm, time32(TimeUnit::SECOND), json); + TestPrimitive(mm, time32(TimeUnit::MILLI), json); + TestPrimitive(mm, time64(TimeUnit::MICRO), json); + TestPrimitive(mm, time64(TimeUnit::NANO), json); + TestPrimitive(mm, duration(TimeUnit::SECOND), json); + TestPrimitive(mm, duration(TimeUnit::MILLI), json); + TestPrimitive(mm, duration(TimeUnit::MICRO), json); + TestPrimitive(mm, duration(TimeUnit::NANO), json); + TestPrimitive(mm, month_interval(), json); + + TestPrimitive(mm, day_time_interval(), "[[7, 600], null]"); + + json = R"(["1970-01-01","2000-02-29","1900-02-28"])"; + TestPrimitive(mm, timestamp(TimeUnit::SECOND), json); + TestPrimitive(mm, timestamp(TimeUnit::SECOND, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::MILLI), json); + TestPrimitive(mm, timestamp(TimeUnit::MILLI, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::MICRO), json); + TestPrimitive(mm, timestamp(TimeUnit::MICRO, "Europe/Paris"), json); + TestPrimitive(mm, timestamp(TimeUnit::NANO), json); + TestPrimitive(mm, timestamp(TimeUnit::NANO, "Europe/Paris"), json); +} + +TEST_F(TestDeviceArrayExport, List) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestNested(mm, list(int8()), "[[1, 2], [3, null], null]"); + TestNested(mm, large_list(uint16()), "[[1, 2], [3, null], null]"); + TestNested(mm, fixed_size_list(int64(), 2), "[[1, 2], [3, null], null]"); + + TestNested(mm, list(large_list(int32())), "[[[1, 2], [3], null], null]"); +} + +TEST_F(TestDeviceArrayExport, ListSliced) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + { + auto factory = [=]() { + return (*ToDevice(mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 6], null]")->data())) + ->Slice(1, 2); + }; + TestNested(factory); + } + { + auto factory = [=]() { + auto values = (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 8]")->data()))->Slice(1, 6); + auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 6]")->data()))->Slice(2, 4); + return ListArray::FromArrays(*offsets, *values); + }; + TestNested(factory); + } +} + + +TEST_F(TestDeviceArrayExport, Struct) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* data = R"([[1, "foo"], [2, null]])"; + auto type = struct_({field("a", int8()), field("b", utf8())}); + TestNested(mm, type, data); +} + +TEST_F(TestDeviceArrayExport, Map) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* json = R"([[[1, "foo"], [2, null]], [[3, "bar"]]])"; + TestNested(mm, map(int8(), utf8()), json); + TestNested(mm, map(int8(), utf8(), /*keys_sorted=*/true), json); +} + +TEST_F(TestDeviceArrayExport, Union) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + const char* data = "[null, [42, 1], [43, true], [42, null], [42, 2]]"; + // Dense + auto field_a = field("a", int8()); + auto field_b = field("b", boolean(), /*nullable=*/false); + auto type = dense_union({field_a, field_b}, {42, 43}); + TestNested(mm, type, data); + // Sparse + field_a = field("a", int8(), /*nullable=*/false); + field_b = field("b", boolean()); + type = sparse_union({field_a, field_b}, {42, 43}); + TestNested(mm, type, data); +} + +TEST_F(TestDeviceArrayExport, Extension) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + TestPrimitive(ToDeviceFactory(mm, ExampleUuid)); + TestPrimitive(ToDeviceFactory(mm, ExampleSmallint)); + TestPrimitive(ToDeviceFactory(mm, ExampleComplex128)); +} + +TEST_F(TestDeviceArrayExport, ExportArrayAndType) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + struct ArrowSchema c_schema {}; + struct ArrowDeviceArray c_array {}; + SchemaExportGuard schema_guard(&c_schema); + ArrayExportGuard array_guard(&c_array.array); + + auto array = ToDevice(mm, *ArrayFromJSON(int8(), "[1, 2, 3]")->data()).ValueOrDie(); + ASSERT_OK(ExportDeviceArray(*array, {nullptr, nullptr}, &c_array, &c_schema)); + const ArrayData& data = *array->data(); + array.reset(); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + ASSERT_FALSE(ArrowArrayIsReleased(&c_array.array)); + ASSERT_EQ(c_schema.format, std::string("c")); + ASSERT_EQ(c_schema.n_children, 0); + ArrayExportChecker checker{}; + checker(&c_array, data, kMyDeviceType, 1, nullptr); +} + +TEST_F(TestDeviceArrayExport, ExportRecordBatch) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + struct ArrowSchema c_schema {}; + struct ArrowDeviceArray c_array {}; + + auto schema = ::arrow::schema( + {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)}); + schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2, kMetadataValues2)); + auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null]")->data()).ValueOrDie(); + auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, false]")->data()).ValueOrDie(); + + auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0, arr1}); }; + + { + auto batch = batch_factory(); + + ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); + ArrayExportGuard array_guard(&c_array.array); + RecordBatchExportChecker checker{}; + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + + // create batch anew, with the same buffer pointers + batch = batch_factory(); + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + } + { + // Check one can export both schema and record batch at once + auto batch = batch_factory(); + + ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); + SchemaExportGuard schema_guard(&c_schema); + ArrayExportGuard array_guard(&c_array.array); + ASSERT_EQ(c_schema.format, std::string("+s")); + ASSERT_EQ(c_schema.n_children, 2); + ASSERT_NE(c_schema.metadata, nullptr); + ASSERT_EQ(kEncodedMetadata2, + std::string(c_schema.metadata, kEncodedMetadata2.size())); + RecordBatchExportChecker checker{}; + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + + // Create batch anew, with the same buffer pointers + batch = batch_factory(); + checker(&c_array, *batch, kMyDeviceType, 1, nullptr); + } +} + //////////////////////////////////////////////////////////////////////////// // Schema import tests diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 837681fab33db..c256d0ec41530 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -78,8 +78,8 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// \brief A device ID to identify this device if there are multiple of this type. /// - /// If there is no "device_id" equivalent (such as for the main CPU device) - /// returns -1. + /// If there is no "device_id" equivalent (such as for the main CPU device on + /// non-numa systems) returns -1. virtual int64_t device_id() const { return -1; } /// \brief Whether this device is the main CPU device. From f79d2a25caae55be964b6dea3df8ca7d896c257a Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 20 Jul 2023 15:46:30 -0400 Subject: [PATCH 13/16] linting --- cpp/src/arrow/c/bridge.h | 1 - cpp/src/arrow/c/bridge_test.cc | 86 +++++++++++++++++++--------------- cpp/src/arrow/device.h | 2 +- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index eaa88beaf976a..92707a59729fc 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -227,7 +227,6 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; - /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// /// The ArrowArray struct has its contents moved (as per the C data interface diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 095990536f18b..bab83aaaa7b6f 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -602,7 +602,7 @@ struct RecordBatchExportChecker { } } - void operator()(struct ArrowDeviceArray* c_export, const RecordBatch& expected_data, + void operator()(struct ArrowDeviceArray* c_export, const RecordBatch& expected_data, const ArrowDeviceType device_type, const int64_t device_id, const void* sync_event) { ASSERT_EQ(c_export->device_type, device_type); @@ -619,7 +619,7 @@ class TestArrayExport : public ::testing::Test { static std::function>()> JSONArrayFactory( std::shared_ptr type, const char* json) { return [=]() { return ArrayFromJSON(type, json); }; - } + } template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { @@ -1140,16 +1140,14 @@ class MyBuffer final : public MutableBuffer { public: using MutableBuffer::MutableBuffer; - ~MyBuffer() { - default_memory_pool()->Free(const_cast(data_), size_); - } + ~MyBuffer() { default_memory_pool()->Free(const_cast(data_), size_); } }; class MyMemoryManager : public CPUMemoryManager { public: explicit MyMemoryManager(const std::shared_ptr& device) - : CPUMemoryManager(device, default_memory_pool()) {} - + : CPUMemoryManager(device, default_memory_pool()) {} + Result> AllocateBuffer(int64_t size) override { uint8_t* data; RETURN_NOT_OK(pool_->Allocate(size, &data)); @@ -1158,11 +1156,12 @@ class MyMemoryManager : public CPUMemoryManager { protected: Result> CopyBufferFrom( - const std::shared_ptr& buf, const std::shared_ptr& from) override { - return CopyNonOwnedFrom(*buf, from); + const std::shared_ptr& buf, + const std::shared_ptr& from) override { + return CopyNonOwnedFrom(*buf, from); } Result> CopyNonOwnedFrom( - const Buffer& buf, const std::shared_ptr& from) override { + const Buffer& buf, const std::shared_ptr& from) override { if (!from->is_cpu()) { return nullptr; } @@ -1186,8 +1185,9 @@ class MyDevice : public Device { } return checked_cast(other).value_ == value_; } - DeviceAllocationType device_type() const override { - return static_cast(kMyDeviceType); } + DeviceAllocationType device_type() const override { + return static_cast(kMyDeviceType); + } int64_t device_id() const { return value_; } std::shared_ptr default_memory_manager() override { return std::make_shared(shared_from_this()); @@ -1199,11 +1199,10 @@ class MyDevice : public Device { class TestDeviceArrayExport : public ::testing::Test { public: - void SetUp() override { - pool_ = default_memory_pool(); - } + void SetUp() override { pool_ = default_memory_pool(); } - static Result> ToDeviceData(const std::shared_ptr& mm, const ArrayData& data) { + static Result> ToDeviceData( + const std::shared_ptr& mm, const ArrayData& data) { arrow::BufferVector buffers; for (const auto& buf : data.buffers) { if (buf) { @@ -1215,27 +1214,31 @@ class TestDeviceArrayExport : public ::testing::Test { } arrow::ArrayDataVector children; - for (const auto& child : data.child_data) { + for (const auto& child : data.child_data) { ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child)); children.push_back(dest); } - return ArrayData::Make(data.type, data.length, buffers, children, data.null_count, data.offset); + return ArrayData::Make(data.type, data.length, buffers, children, data.null_count, + data.offset); } - static Result> ToDevice(const std::shared_ptr& mm, const ArrayData& data) { + static Result> ToDevice(const std::shared_ptr& mm, + const ArrayData& data) { ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data)); return MakeArray(result); } template - static std::function>()> ToDeviceFactory(const std::shared_ptr& mm, ArrayFactory&& factory) { + static std::function>()> ToDeviceFactory( + const std::shared_ptr& mm, ArrayFactory&& factory) { return [&]() { return ToDevice(mm, *factory()->data()); }; } static std::function>()> JSONArrayFactory( - const std::shared_ptr& mm, std::shared_ptr type, const char* json) { - return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; + const std::shared_ptr& mm, std::shared_ptr type, + const char* json) { + return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; } template @@ -1264,14 +1267,14 @@ class TestDeviceArrayExport : public ::testing::Test { ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); } - template void TestNested(ArrayFactory&& factory) { ArrayExportChecker checker; TestWithArrayFactory(std::forward(factory), checker); } - void TestNested(const std::shared_ptr& mm, const std::shared_ptr& type, const char* json) { + void TestNested(const std::shared_ptr& mm, + const std::shared_ptr& type, const char* json) { TestNested(JSONArrayFactory(mm, type, json)); } @@ -1280,9 +1283,10 @@ class TestDeviceArrayExport : public ::testing::Test { TestNested(std::forward(factory)); } - void TestPrimitive(const std::shared_ptr& mm, const std::shared_ptr& type, const char* json) { + void TestPrimitive(const std::shared_ptr& mm, + const std::shared_ptr& type, const char* json) { TestNested(mm, type, json); - } + } protected: MemoryPool* pool_; @@ -1322,11 +1326,13 @@ TEST_F(TestDeviceArrayExport, PrimitiveSliced) { std::shared_ptr device = std::make_shared(1); auto mm = device->default_memory_manager(); - auto factory = [=]() { return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data()))->Slice(1, 2); }; - TestPrimitive(factory); + auto factory = [=]() { + return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data())) + ->Slice(1, 2); + }; + TestPrimitive(factory); } - TEST_F(TestDeviceArrayExport, Temporal) { std::shared_ptr device = std::make_shared(1); auto mm = device->default_memory_manager(); @@ -1374,22 +1380,27 @@ TEST_F(TestDeviceArrayExport, ListSliced) { { auto factory = [=]() { - return (*ToDevice(mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 6], null]")->data())) + return (*ToDevice( + mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 6], null]") + ->data())) ->Slice(1, 2); }; TestNested(factory); } { auto factory = [=]() { - auto values = (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 8]")->data()))->Slice(1, 6); - auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 6]")->data()))->Slice(2, 4); + auto values = + (*ToDevice(mm, + *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 8]")->data())) + ->Slice(1, 6); + auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 6]")->data())) + ->Slice(2, 4); return ListArray::FromArrays(*offsets, *values); }; TestNested(factory); } } - TEST_F(TestDeviceArrayExport, Struct) { std::shared_ptr device = std::make_shared(1); auto mm = device->default_memory_manager(); @@ -1452,7 +1463,7 @@ TEST_F(TestDeviceArrayExport, ExportArrayAndType) { ASSERT_EQ(c_schema.format, std::string("c")); ASSERT_EQ(c_schema.n_children, 0); ArrayExportChecker checker{}; - checker(&c_array, data, kMyDeviceType, 1, nullptr); + checker(&c_array, data, kMyDeviceType, 1, nullptr); } TEST_F(TestDeviceArrayExport, ExportRecordBatch) { @@ -1463,16 +1474,17 @@ TEST_F(TestDeviceArrayExport, ExportRecordBatch) { struct ArrowDeviceArray c_array {}; auto schema = ::arrow::schema( - {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)}); + {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)}); schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2, kMetadataValues2)); auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null]")->data()).ValueOrDie(); - auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, false]")->data()).ValueOrDie(); + auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, false]")->data()) + .ValueOrDie(); auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0, arr1}); }; { auto batch = batch_factory(); - + ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); ArrayExportGuard array_guard(&c_array.array); RecordBatchExportChecker checker{}; diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index c256d0ec41530..9cc68fe8c82ce 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -78,7 +78,7 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// \brief A device ID to identify this device if there are multiple of this type. /// - /// If there is no "device_id" equivalent (such as for the main CPU device on + /// If there is no "device_id" equivalent (such as for the main CPU device on /// non-numa systems) returns -1. virtual int64_t device_id() const { return -1; } From 5e1939fed4dda82af81bffc96246eb1ffcc9888e Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 21 Jul 2023 11:25:13 -0400 Subject: [PATCH 14/16] update from feedback --- cpp/src/arrow/c/bridge_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index bab83aaaa7b6f..1cc354c2fcc45 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -1180,7 +1180,7 @@ class MyDevice : public Device { const char* type_name() const override { return kMyDeviceTypeName; } std::string ToString() const override { return kMyDeviceTypeName; } bool Equals(const Device& other) const override { - if (other.type_name() != kMyDeviceTypeName) { + if (other.type_name() != kMyDeviceTypeName || other.device_type() != device_type()) { return false; } return checked_cast(other).value_ == value_; From 1ca39b816fe81cba7074652ba17024a7a9afa5db Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 24 Jul 2023 15:36:50 -0400 Subject: [PATCH 15/16] mark `device_id` override to fix compiler warning --- cpp/src/arrow/c/bridge_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 1cc354c2fcc45..368644898cf8e 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -1188,7 +1188,7 @@ class MyDevice : public Device { DeviceAllocationType device_type() const override { return static_cast(kMyDeviceType); } - int64_t device_id() const { return value_; } + int64_t device_id() const override { return value_; } std::shared_ptr default_memory_manager() override { return std::make_shared(shared_from_this()); } From d90ce9e93f2b13f69484223f3a7bd87829027468 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 24 Jul 2023 17:30:51 -0400 Subject: [PATCH 16/16] add schema export guard for memory leak --- cpp/src/arrow/c/bridge_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 368644898cf8e..5c7de8e4a0783 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -1486,6 +1486,7 @@ TEST_F(TestDeviceArrayExport, ExportRecordBatch) { auto batch = batch_factory(); ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, &c_schema)); + SchemaExportGuard schema_guard(&c_schema); ArrayExportGuard array_guard(&c_array.array); RecordBatchExportChecker checker{}; checker(&c_array, *batch, kMyDeviceType, 1, nullptr);