Skip to content

Commit 84e4525

Browse files
committed
Add classes and methods for simplifying use of CUDA IPC machinery. No tests yet
Change-Id: Ib46b646219e83c35828cf19a5e8d3bc8cc096f25
1 parent 508febb commit 84e4525

File tree

8 files changed

+179
-44
lines changed

8 files changed

+179
-44
lines changed

cpp/src/arrow/gpu/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ ADD_ARROW_LIB(arrow_gpu
4545
)
4646

4747
install(FILES
48-
cuda_common.h
48+
cuda_api.h
49+
cuda_context.h
4950
cuda_memory.h
5051
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu")
5152

cpp/src/arrow/gpu/cuda-benchmark.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include "arrow/memory_pool.h"
2626
#include "arrow/test-util.h"
2727

28-
#include "arrow/gpu/cuda_memory.h"
28+
#include "arrow/gpu/cuda_api.h"
2929

3030
namespace arrow {
3131
namespace gpu {
@@ -41,7 +41,7 @@ static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t tot
4141
ABORT_NOT_OK(manager->GetContext(kGpuNumber, &context));
4242

4343
std::shared_ptr<CudaBuffer> device_buffer;
44-
ABORT_NOT_OK(AllocateCudaBuffer(total_bytes, context, &device_buffer));
44+
ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer));
4545
CudaBufferWriter writer(device_buffer);
4646

4747
if (buffer_size > 0) {

cpp/src/arrow/gpu/cuda-test.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#include "arrow/status.h"
2525
#include "arrow/test-util.h"
2626

27-
#include "arrow/gpu/cuda_memory.h"
27+
#include "arrow/gpu/cuda_api.h"
2828

2929
namespace arrow {
3030
namespace gpu {
@@ -51,7 +51,7 @@ class TestCudaBuffer : public TestCudaBufferBase {
5151
TEST_F(TestCudaBuffer, Allocate) {
5252
const int64_t kSize = 100;
5353
std::shared_ptr<CudaBuffer> buffer;
54-
ASSERT_OK(AllocateCudaBuffer(kSize, context_, &buffer));
54+
ASSERT_OK(context_->Allocate(kSize, &buffer));
5555
ASSERT_EQ(kSize, buffer->size());
5656
}
5757

@@ -66,7 +66,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data,
6666
TEST_F(TestCudaBuffer, CopyFromHost) {
6767
const int64_t kSize = 1000;
6868
std::shared_ptr<CudaBuffer> device_buffer;
69-
ASSERT_OK(AllocateCudaBuffer(kSize, context_, &device_buffer));
69+
ASSERT_OK(context_->Allocate(kSize, &device_buffer));
7070

7171
std::shared_ptr<PoolBuffer> host_buffer;
7272
ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
@@ -82,7 +82,7 @@ class TestCudaBufferWriter : public TestCudaBufferBase {
8282
void SetUp() { TestCudaBufferBase::SetUp(); }
8383

8484
void Allocate(const int64_t size) {
85-
ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer_));
85+
ASSERT_OK(context_->Allocate(size, &device_buffer_));
8686
writer_.reset(new CudaBufferWriter(device_buffer_));
8787
}
8888

@@ -189,7 +189,7 @@ TEST_F(TestCudaBufferReader, Basics) {
189189
std::shared_ptr<CudaBuffer> device_buffer;
190190

191191
const int64_t size = 1000;
192-
ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer));
192+
ASSERT_OK(context_->Allocate(size, &device_buffer));
193193

194194
std::shared_ptr<PoolBuffer> buffer;
195195
ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer));

cpp/src/arrow/gpu/cuda_api.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
<<<<<<< HEAD
12
// Licensed to the Apache Software Foundation (ASF) under one
23
// or more contributor license agreements. See the NOTICE file
34
// distributed with this work for additional information
@@ -18,6 +19,7 @@
1819
#ifndef ARROW_GPU_CUDA_API_H
1920
#define ARROW_GPU_CUDA_API_H
2021

22+
#include "arrow/gpu/cuda_context.h"
2123
#include "arrow/gpu/cuda_memory.h"
2224
#include "arrow/gpu/cuda_version.h"
2325

cpp/src/arrow/gpu/cuda_context.cc

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,17 @@ class CudaContext::CudaContextImpl {
8888
return Status::OK();
8989
}
9090

91+
Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) {
92+
CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
93+
auto handle = reinterpret_cast<const CUipcMemHandle*>(ipc_handle.handle());
94+
95+
CUdeviceptr data;
96+
CU_RETURN_NOT_OK(
97+
cuIpcOpenMemHandle(&data, *handle, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS));
98+
*out = reinterpret_cast<uint8_t*>(data);
99+
return Status::OK();
100+
}
101+
91102
const CudaDevice device() const { return device_; }
92103

93104
private:
@@ -203,8 +214,11 @@ CudaContext::CudaContext() { impl_.reset(new CudaContextImpl()); }
203214

204215
CudaContext::~CudaContext() {}
205216

206-
Status CudaContext::Allocate(int64_t nbytes, uint8_t** out) {
207-
return impl_->Allocate(nbytes, out);
217+
Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out) {
218+
uint8_t* data = nullptr;
219+
RETURN_NOT_OK(impl_->Allocate(nbytes, &data));
220+
*out = std::make_shared<CudaBuffer>(data, nbytes, this->shared_from_this(), true);
221+
return Status::OK();
208222
}
209223

210224
Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
@@ -219,5 +233,20 @@ Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) {
219233
return impl_->Free(device_ptr, nbytes);
220234
}
221235

236+
Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,
237+
std::shared_ptr<CudaBuffer>* out) {
238+
uint8_t* data = nullptr;
239+
RETURN_NOT_OK(impl_->OpenIpcBuffer(ipc_handle, &data));
240+
241+
// Need to ask the device how big the buffer is
242+
size_t allocation_size = 0;
243+
CU_RETURN_NOT_OK(cuMemGetAddressRange(nullptr, &allocation_size,
244+
reinterpret_cast<CUdeviceptr>(data)));
245+
246+
*out = std::make_shared<CudaBuffer>(data, allocation_size, this->shared_from_this(),
247+
true, true);
248+
return Status::OK();
249+
}
250+
222251
} // namespace gpu
223252
} // namespace arrow

cpp/src/arrow/gpu/cuda_context.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@
2424
#include "arrow/status.h"
2525
#include "arrow/util/visibility.h"
2626

27+
#include "arrow/gpu/cuda_memory.h"
28+
2729
namespace arrow {
2830
namespace gpu {
2931

30-
class CudaBuffer;
31-
class CudaHostBuffer;
32-
3332
// Forward declaration
3433
class CudaContext;
3534

@@ -41,6 +40,7 @@ class ARROW_EXPORT CudaDeviceManager {
4140
Status GetContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);
4241

4342
Status AllocateHost(int64_t nbytes, std::shared_ptr<CudaHostBuffer>* buffer);
43+
4444
Status FreeHost(uint8_t* data, int64_t nbytes);
4545

4646
int num_devices() const;
@@ -59,26 +59,40 @@ struct ARROW_EXPORT CudaDeviceInfo {};
5959

6060
/// \class CudaContext
6161
/// \brief Friendlier interface to the CUDA driver API
62-
class ARROW_EXPORT CudaContext {
62+
class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext> {
6363
public:
6464
~CudaContext();
6565

6666
Status Destroy();
6767

68-
Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes);
69-
Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes);
68+
/// \brief Allocate CUDA memory on GPU device for this context
69+
/// \param[in] nbytes number of bytes
70+
/// \param[out] out the allocated buffer
71+
/// \return Status
72+
Status Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out);
7073

71-
Status Allocate(int64_t nbytes, uint8_t** out);
72-
Status Free(uint8_t* device_ptr, int64_t nbytes);
74+
/// \brief Open existing CUDA IPC memory handle
75+
/// \param[in] ipc_handle opaque pointer to CUipcMemHandle (driver API)
76+
/// \param[out] buffer a CudaBuffer referencing
77+
/// \return Status
78+
Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,
79+
std::shared_ptr<CudaBuffer>* buffer);
7380

7481
int64_t bytes_allocated() const;
7582

7683
private:
7784
CudaContext();
7885

86+
Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes);
87+
Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes);
88+
Status Free(uint8_t* device_ptr, int64_t nbytes);
89+
7990
class CudaContextImpl;
8091
std::unique_ptr<CudaContextImpl> impl_;
8192

93+
friend CudaBuffer;
94+
friend CudaBufferReader;
95+
friend CudaBufferWriter;
8296
friend CudaDeviceManager::CudaDeviceManagerImpl;
8397
};
8498

cpp/src/arrow/gpu/cuda_memory.cc

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
#include <algorithm>
2121
#include <cstdint>
22+
#include <cstdlib>
2223
#include <memory>
2324

25+
#include <cuda.h>
26+
2427
#include "arrow/buffer.h"
2528
#include "arrow/io/memory.h"
2629
#include "arrow/status.h"
@@ -32,15 +35,69 @@
3235
namespace arrow {
3336
namespace gpu {
3437

35-
CudaBuffer::~CudaBuffer() {
38+
// ----------------------------------------------------------------------
39+
// CUDA IPC memory handle
40+
41+
struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
42+
explicit CudaIpcMemHandleImpl(const void* handle) {
43+
memcpy(&ipc_handle, handle, sizeof(CUipcMemHandle));
44+
}
45+
46+
CUipcMemHandle ipc_handle;
47+
};
48+
49+
CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
50+
impl_.reset(new CudaIpcMemHandleImpl(handle));
51+
}
52+
53+
CudaIpcMemHandle::~CudaIpcMemHandle() {}
54+
55+
Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle,
56+
std::unique_ptr<CudaIpcMemHandle>* handle) {
57+
*handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
58+
return Status::OK();
59+
}
60+
61+
Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
62+
std::shared_ptr<MutableBuffer> buffer;
63+
constexpr size_t kHandleSize = sizeof(CUipcMemHandle);
64+
RETURN_NOT_OK(AllocateBuffer(pool, static_cast<int64_t>(kHandleSize), &buffer));
65+
memcpy(buffer->mutable_data(), &impl_->ipc_handle, kHandleSize);
66+
*out = buffer;
67+
return Status::OK();
68+
}
69+
70+
const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
71+
72+
// ----------------------------------------------------------------------
73+
74+
CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
75+
const std::shared_ptr<CudaContext>& context, bool own_data,
76+
bool is_ipc)
77+
: Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
78+
is_mutable_ = true;
79+
mutable_data_ = data;
80+
}
81+
82+
CudaBuffer::~CudaBuffer() { DCHECK(Close().ok()); }
83+
84+
Status CudaBuffer::Close() {
3685
if (own_data_) {
37-
DCHECK(context_->Free(mutable_data_, size_).ok());
86+
if (is_ipc_) {
87+
CU_RETURN_NOT_OK(cuIpcCloseMemHandle(reinterpret_cast<CUdeviceptr>(mutable_data_)));
88+
} else {
89+
return context_->Free(mutable_data_, size_);
90+
}
3891
}
92+
return Status::OK();
3993
}
4094

4195
CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
4296
const int64_t size)
43-
: Buffer(parent, offset, size), context_(parent->context()) {}
97+
: Buffer(parent, offset, size),
98+
context_(parent->context()),
99+
own_data_(false),
100+
is_ipc_(false) {}
44101

45102
Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
46103
uint8_t* out) const {
@@ -53,12 +110,15 @@ Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data,
53110
return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
54111
}
55112

56-
Status AllocateCudaBuffer(const int64_t size, const std::shared_ptr<CudaContext>& context,
57-
std::shared_ptr<CudaBuffer>* out) {
58-
DCHECK(context);
59-
uint8_t* data = nullptr;
60-
RETURN_NOT_OK(context->Allocate(size, &data));
61-
*out = std::make_shared<CudaBuffer>(data, size, context);
113+
Status CudaBuffer::ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle) {
114+
if (is_ipc_) {
115+
return Status::Invalid("Buffer has already been exported for IPC");
116+
}
117+
CUipcMemHandle cu_handle;
118+
CU_RETURN_NOT_OK(
119+
cuIpcGetMemHandle(&cu_handle, reinterpret_cast<CUdeviceptr>(mutable_data_)));
120+
is_ipc_ = true;
121+
*handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(&cu_handle));
62122
return Status::OK();
63123
}
64124

0 commit comments

Comments
 (0)