Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3451: [C++/Python] pyarrow and numba CUDA interop #2732

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f16148f
ARROW-3430: [Packaging] Add workaround to verify 0.11.0
kou Oct 4, 2018
b629401
ARROW-3431: [GLib] Include Gemfile to archive
kou Oct 4, 2018
cf3aebe
ARROW-3432: [Packaging] Expand variables in commit message
kou Oct 4, 2018
5284ad0
ARROW-3331: [Gandiva][C++] Add re2 to toolchain
Oct 4, 2018
4e20aa9
ARROW-3438: [Packaging] Fix too much Markdown escape in CHANGELOG
kou Oct 5, 2018
50aebe6
Implement interop between pyarrow and numba.
pearu Oct 9, 2018
cba9bcf
Merging with upstream/master
pearu Oct 9, 2018
ef69f35
Implement interop between pyarrow and numba.
pearu Oct 9, 2018
a42ffd1
Merge branch 'arrow-cuda' of github.com:Quansight/arrow into arrow-cuda
pearu Oct 9, 2018
3a1e564
Fix lint issues.
pearu Oct 9, 2018
1107aad
pyarrow and numba CUDA interop
pearu Oct 9, 2018
1478909
Implement a view of device memory (CudaBuffer.foreign_buffer)
pearu Oct 10, 2018
0920105
lost it git messages
pearu Oct 10, 2018
cbe7338
Fix lint-clang formatting issues.
pearu Oct 10, 2018
740e474
Apply feedback from pitrou
pearu Oct 14, 2018
1bf7856
Moved numba jitted function inside the test function.
pearu Oct 15, 2018
f576474
Fix compiler warning re casting pointers with different sizes.
pearu Oct 15, 2018
bdc00ce
Introduce GetSharedContext and use_numba_context.
pearu Oct 16, 2018
42fc232
Fix make lint failures.
pearu Oct 16, 2018
c1b5717
Fix bug that enabled using numba context manager always.
pearu Oct 16, 2018
f27796a
Restore previous context after cuCtxSetCurrent. Remove use_numba_cont…
pearu Oct 16, 2018
cc87f3b
Fix lint issues.
pearu Oct 16, 2018
a6d5376
Introduce ContextSaver. Fix doc strings. Set fixed seed for cuda tests.
pearu Oct 17, 2018
cfb1e74
Fix lint issue.
pearu Oct 17, 2018
c546ce5
Synchronize before copying device data to host.
pearu Oct 18, 2018
415cbd6
Introduce synchronize methods.
pearu Oct 18, 2018
f9e69b2
Fix check-format issues.
pearu Oct 18, 2018
1b0c19e
Implemented context push-pop pattern.
pearu Oct 18, 2018
3e99126
Implemented context push-pop pattern. 2nd try.
pearu Oct 18, 2018
767df42
Introduce device argument to AllocateHost. Use sync in IPC test.
pearu Oct 18, 2018
39d3f69
Introduce CloseIpcBuffer, fixes IPC test.
pearu Oct 18, 2018
fc5f505
Expose CudaContext device number for CudaBufferWriter in buffering mode.
pearu Oct 18, 2018
8457e00
Use C++ style cast.
pearu Oct 23, 2018
b6f4ede
Remove contexts_ cache and related methods CreateNewContext and Creat…
pearu Oct 23, 2018
b503b0c
Update docstring for GetContext
pitrou Oct 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ class CudaContext::CudaContextImpl {

Status Init(const CudaDevice& device) {
device_ = device;
own_context_ = true;
CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle));
is_open_ = true;
return Status::OK();
}

Status InitShared(const CudaDevice& device, CUcontext ctx) {
device_ = device;
own_context_ = false;
context_ = ctx;
is_open_ = true;
return Status::OK();
}

Status Close() {
if (is_open_ && own_context_) {
CU_RETURN_NOT_OK(cuCtxDestroy(context_));
Expand Down Expand Up @@ -110,6 +119,8 @@ class CudaContext::CudaContextImpl {

const CudaDevice device() const { return device_; }

const void* context_handle() const { return reinterpret_cast<void*>(context_); }

private:
CudaDevice device_;
CUcontext context_;
Expand Down Expand Up @@ -165,6 +176,13 @@ class CudaDeviceManager::CudaDeviceManagerImpl {
return (*out)->impl_->Init(devices_[device_number]);
}

Status CreateSharedContext(int device_number, CUcontext ctx,
std::shared_ptr<CudaContext>* out) {
// TODO: check if context exists already, if so, return it.
*out = std::shared_ptr<CudaContext>(new CudaContext());
return (*out)->impl_->InitShared(devices_[device_number], ctx);
}

Status GetContext(int device_number, std::shared_ptr<CudaContext>* out) {
auto it = contexts_.find(device_number);
pearu marked this conversation as resolved.
Show resolved Hide resolved
if (it == contexts_.end()) {
Expand Down Expand Up @@ -212,6 +230,11 @@ Status CudaDeviceManager::CreateNewContext(int device_number,
return impl_->CreateNewContext(device_number, out);
}

Status CudaDeviceManager::CreateSharedContext(int device_number, void* ctx,
std::shared_ptr<CudaContext>* out) {
return impl_->CreateSharedContext(device_number, (CUcontext)ctx, out);
pearu marked this conversation as resolved.
Show resolved Hide resolved
}

Status CudaDeviceManager::AllocateHost(int64_t nbytes,
std::shared_ptr<CudaHostBuffer>* out) {
uint8_t* data = nullptr;
Expand Down Expand Up @@ -240,6 +263,12 @@ Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out) {
return Status::OK();
}

Status CudaContext::View(uint8_t* data, int64_t nbytes,
std::shared_ptr<CudaBuffer>* out) {
*out = std::make_shared<CudaBuffer>(data, nbytes, this->shared_from_this(), false);
return Status::OK();
}

Status CudaContext::ExportIpcBuffer(void* data,
std::shared_ptr<CudaIpcMemHandle>* handle) {
return impl_->ExportIpcBuffer(data, handle);
Expand Down Expand Up @@ -276,5 +305,7 @@ Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,

int64_t CudaContext::bytes_allocated() const { return impl_->bytes_allocated(); }

const void* CudaContext::handle() const { return impl_->context_handle(); }

} // namespace gpu
} // namespace arrow
17 changes: 17 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class ARROW_EXPORT CudaDeviceManager {
/// In general code will use GetContext
Status CreateNewContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);

/// \brief Create shared context for a given device number
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit I don't understand the difference between "get shared context" and "create shared context".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Create shared context" will create CudaContext with a context handle that is owned by another library (or created by another library).
"Get shared context" does "Create shared context" if the CudaContext for the given device is not created yet, and caches the CudaContext instance. Otherwise, it will return the cached instance.

This follows the same logic as in GetContext and CreateNewContext methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all those variations actually useful? Isn't GetSharedContext enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between the values returned by GetSharedContext and GetContext is that in the latter case the created context is owned by arrow context manager (and that is allowed to destroy the context in Close method).
For the shared case, the context wrapped by CudaContext is assumed to be managed by another library.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do we need both GetSharedContext and CreateSharedContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetSharedContext is what users should use in their code. CreateSharedContext does not need to be exposed to users. Similarly, CreateNewContext does not need to be exposed as well, IMHO.

However, whoever designed the CudaContext, must have something in mind when making CreateNewContext public. There is also CudaContext::Close method that is currently not used anywhere under Arrow ...

So, I don't know the answer. I am fine with making CreateSharedContext private.

/// \param[in] device_number
/// \param[in] handle CUDA context handler created by another library
/// \param[out] out shared context
Status CreateSharedContext(int device_number, void* handle,
std::shared_ptr<CudaContext>* out);

Status AllocateHost(int64_t nbytes, std::shared_ptr<CudaHostBuffer>* buffer);

Status FreeHost(void* data, int64_t nbytes);
Expand Down Expand Up @@ -76,6 +83,13 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
/// \return Status
Status Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out);

/// \brief Create a view of CUDA memory on GPU device of this context
pearu marked this conversation as resolved.
Show resolved Hide resolved
/// \param[in] data the starting device address
/// \param[in] nbytes number of bytes
/// \param[out] out the view buffer
/// \return Status
Status View(uint8_t* data, int64_t nbytes, std::shared_ptr<CudaBuffer>* out);

/// \brief Open existing CUDA IPC memory handle
/// \param[in] ipc_handle opaque pointer to CUipcMemHandle (driver API)
/// \param[out] buffer a CudaBuffer referencing
Expand All @@ -85,6 +99,9 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext

int64_t bytes_allocated() const;

/// \brief Expose CUDA context handle to other libraries
const void* handle() const;

private:
CudaContext();

Expand Down
115 changes: 112 additions & 3 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ cdef class Context:
""" CUDA driver context.
"""

def __cinit__(self, int device_number=0):
def __cinit__(self, int device_number=0, uintptr_t handle=0):
pearu marked this conversation as resolved.
Show resolved Hide resolved
"""Construct the shared CUDA driver context for a particular device.

Parameters
----------
device_number : int
Specify the gpu device for which the CUDA driver context is
requested.

handle : void_p
pearu marked this conversation as resolved.
Show resolved Hide resolved
Specify handle for a shared context that has been created by
another library.
"""
cdef CCudaDeviceManager* manager
check_status(CCudaDeviceManager.GetInstance(&manager))
Expand All @@ -43,9 +45,57 @@ cdef class Context:
self.context.reset()
raise ValueError('device_number argument must be '
'non-negative less than %s' % (n))
check_status(manager.GetContext(device_number, &self.context))
if handle == 0:
check_status(manager.GetContext(device_number, &self.context))
else:
check_status(manager.CreateSharedContext(device_number,
<void*>handle,
&self.context))
self.device_number = device_number

@staticmethod
def from_numba(context=None):
"""Create Context instance from a numba CUDA context.

Parameters
----------
context : {numba.cuda.cudadrv.driver.Context, None}
Specify numba CUDA context instance. When None, use the
current numba context.

Returns
-------
shared_context : pyarrow.cuda.Context
Context instance.
"""
if context is None:
import numba.cuda
context = numba.cuda.cudadrv.devices.get_context()
pearu marked this conversation as resolved.
Show resolved Hide resolved
return Context(device_number=context.device.id,
handle=context.handle.value)

def to_numba(self):
"""Convert Context to numba CUDA context.

Returns
-------
context : numba.cuda.cudadrv.driver.Context
Numba CUDA context instance.
"""
import ctypes
import numba.cuda
device = numba.cuda.gpus[self.device_number]
handle = ctypes.c_void_p(self.handle)
context = numba.cuda.cudadrv.driver.Context(device, handle)

class DummyPendingDeallocs(object):
# Context is management by pyarrow
pearu marked this conversation as resolved.
Show resolved Hide resolved
def add_item(self, *args, **kwargs):
pass

context.deallocations = DummyPendingDeallocs()
return context

@staticmethod
def get_num_devices():
""" Return the number of GPU devices.
Expand All @@ -60,6 +110,12 @@ cdef class Context:
"""
return self.device_number

@property
def handle(self):
""" Return pointer to context handle.
"""
return <uintptr_t>self.context.get().handle()

cdef void init(self, const shared_ptr[CCudaContext]& ctx):
self.context = ctx

Expand All @@ -86,6 +142,34 @@ cdef class Context:
check_status(self.context.get().Allocate(nbytes, &cudabuf))
return pyarrow_wrap_cudabuffer(cudabuf)

def foreign_buffer(self, address, size):
"""Create device buffer from device address and size as a view.

The caller is responsible for allocating and freeing the
memory as well as ensureing that the memory belongs to the
CUDA context that this Context instance holds.

Parameters
----------
address : intptr_t
pearu marked this conversation as resolved.
Show resolved Hide resolved
Specify the starting address of the buffer.
size : int
Specify the size of device buffer in bytes.

Returns
-------
cbuf : CudaBuffer
Device buffer as a view of device memory.
"""
cdef:
intptr_t c_addr = address
int64_t c_size = size
shared_ptr[CCudaBuffer] cudabuf
check_status(self.context.get().View(<uint8_t*>c_addr,
c_size,
&cudabuf))
return pyarrow_wrap_cudabuffer(cudabuf)

def open_ipc_buffer(self, ipc_handle):
""" Open existing CUDA IPC memory handle

Expand Down Expand Up @@ -232,6 +316,31 @@ cdef class CudaBuffer(Buffer):
check_status(CCudaBuffer.FromBuffer(buf_, &cbuf))
return pyarrow_wrap_cudabuffer(cbuf)

@staticmethod
def from_numba(mem):
"""Create a CudaBuffer view from numba MemoryPointer instance.

Parameters
----------
mem : numba.cuda.cudadrv.driver.MemoryPointer

Returns
-------
cbuf : CudaBuffer
Device buffer as a view of numba MemoryPointer.
"""
ctx = Context.from_numba(mem.context)
return ctx.foreign_buffer(mem.device_pointer.value, mem.size)

def to_numba(self):
"""Return numba memory pointer of CudaBuffer instance.
"""
import ctypes
from numba.cuda.cudadrv.driver import MemoryPointer
return MemoryPointer(self.context.to_numba(),
pointer=ctypes.c_void_p(self.address),
size=self.size)

cdef getitem(self, int64_t i):
return self.copy_to_host(position=i, nbytes=1)[0]

Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow_cuda.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:
CStatus GetContext(int gpu_number, shared_ptr[CCudaContext]* ctx)
# CStatus CreateNewContext(int gpu_number,
# shared_ptr[CCudaContext]* ctx)
CStatus CreateSharedContext(int gpu_number,
void* handle,
shared_ptr[CCudaContext]* ctx)
CStatus AllocateHost(int64_t nbytes,
shared_ptr[CCudaHostBuffer]* buffer)
# CStatus FreeHost(void* data, int64_t nbytes)
Expand All @@ -36,9 +39,13 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:
shared_ptr[CCudaContext] shared_from_this()
# CStatus Close()
CStatus Allocate(int64_t nbytes, shared_ptr[CCudaBuffer]* out)
CStatus View(uint8_t* data,
int64_t nbytes,
shared_ptr[CCudaBuffer]* out)
CStatus OpenIpcBuffer(const CCudaIpcMemHandle& ipc_handle,
shared_ptr[CCudaBuffer]* buffer)
int64_t bytes_allocated() const
const void* handle() const

cdef cppclass CCudaIpcMemHandle" arrow::gpu::CudaIpcMemHandle":
@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def test_context_device_buffer():
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)

cudabuf = global_context.buffer_from_data(
buf.slice(offset=soffset, length=ssize))
buf.slice(offset=soffset, length=ssize))
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
Expand Down
Loading