Skip to content

Commit

Permalink
pyarrow and numba CUDA interop
Browse files Browse the repository at this point in the history
Change-Id: I6057fdc68720c5d1c215ed16eb6aaedf0b67818e
  • Loading branch information
pearu authored and wesm committed Oct 9, 2018
1 parent f4f6269 commit a403400
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 3 deletions.
25 changes: 25 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ class CudaContext::CudaContextImpl {

Status Init(const CudaDevice& device) {
device_ = device;
own_context_ = true;
CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle));
is_open_ = true;
return Status::OK();
}

Status InitShared(const CudaDevice& device, CUcontext ctx) {
device_ = device;
own_context_ = false;
context_ = ctx;
is_open_ = true;
return Status::OK();
}

Status Close() {
if (is_open_ && own_context_) {
CU_RETURN_NOT_OK(cuCtxDestroy(context_));
Expand Down Expand Up @@ -110,6 +119,8 @@ class CudaContext::CudaContextImpl {

const CudaDevice device() const { return device_; }

const void* context_handle() const { return (void*)context_; }

private:
CudaDevice device_;
CUcontext context_;
Expand Down Expand Up @@ -165,6 +176,12 @@ class CudaDeviceManager::CudaDeviceManagerImpl {
return (*out)->impl_->Init(devices_[device_number]);
}

Status CreateSharedContext(int device_number, CUcontext ctx, std::shared_ptr<CudaContext>* out) {
// TODO: check if context exists already, if so, return it.
*out = std::shared_ptr<CudaContext>(new CudaContext());
return (*out)->impl_->InitShared(devices_[device_number], ctx);
}

Status GetContext(int device_number, std::shared_ptr<CudaContext>* out) {
auto it = contexts_.find(device_number);
if (it == contexts_.end()) {
Expand Down Expand Up @@ -212,6 +229,12 @@ Status CudaDeviceManager::CreateNewContext(int device_number,
return impl_->CreateNewContext(device_number, out);
}

Status CudaDeviceManager::CreateSharedContext(int device_number,
void* ctx,
std::shared_ptr<CudaContext>* out) {
return impl_->CreateSharedContext(device_number, (CUcontext)ctx, out);
}

Status CudaDeviceManager::AllocateHost(int64_t nbytes,
std::shared_ptr<CudaHostBuffer>* out) {
uint8_t* data = nullptr;
Expand Down Expand Up @@ -276,5 +299,7 @@ Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,

int64_t CudaContext::bytes_allocated() const { return impl_->bytes_allocated(); }

const void* CudaContext::handle() const { return impl_->context_handle(); }

} // namespace gpu
} // namespace arrow
9 changes: 9 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class ARROW_EXPORT CudaDeviceManager {
/// In general code will use GetContext
Status CreateNewContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);

/// \brief Create shared context for a given device number
/// \param[in] device_number
/// \param[in] handle CUDA context handler created by another library
/// \param[out] out shared context
Status CreateSharedContext(int device_number, void* handle, std::shared_ptr<CudaContext>* out);

Status AllocateHost(int64_t nbytes, std::shared_ptr<CudaHostBuffer>* buffer);

Status FreeHost(void* data, int64_t nbytes);
Expand Down Expand Up @@ -85,6 +91,9 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext

int64_t bytes_allocated() const;

/// \brief Expose CUDA context handle to other libraries
const void* handle() const;

private:
CudaContext();

Expand Down
71 changes: 68 additions & 3 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ cdef class Context:
""" CUDA driver context.
"""

def __cinit__(self, int device_number=0):
def __cinit__(self, int device_number=0, uintptr_t handle=0):
"""Construct the shared CUDA driver context for a particular device.
Parameters
----------
device_number : int
Specify the gpu device for which the CUDA driver context is
requested.
handle : void_p
Specify handle for a shared context that has been created by
another library.
"""
cdef CCudaDeviceManager* manager
check_status(CCudaDeviceManager.GetInstance(&manager))
Expand All @@ -43,9 +45,57 @@ cdef class Context:
self.context.reset()
raise ValueError('device_number argument must be '
'non-negative less than %s' % (n))
check_status(manager.GetContext(device_number, &self.context))
if handle == 0:
check_status(manager.GetContext(device_number, &self.context))
else:
check_status(manager.CreateSharedContext(device_number,
<void*>handle,
&self.context))
self.device_number = device_number

@staticmethod
def from_numba(context=None):
"""Create Context instance from a numba CUDA context.
Parameters
----------
context : {numba.cuda.cudadrv.driver.Context, None}
Specify numba CUDA context instance. When None, use the
current numba context.
Returns
-------
shared_context : pyarrow.cuda.Context
Context instance.
"""
if context is None:
import numba.cuda
context = numba.cuda.cudadrv.devices.get_context()
return Context(device_number=context.device.id,
handle=context.handle.value)

def to_numba(self):
"""Convert Context to numba CUDA context.
Returns
-------
context : numba.cuda.cudadrv.driver.Context
Numba CUDA context instance.
"""
import ctypes
import numba.cuda
device = numba.cuda.gpus[self.device_number]
handle = ctypes.c_void_p(self.handle)
context = numba.cuda.cudadrv.driver.Context(device, handle)

class DummyPendingDeallocs(object):
# Context is management by pyarrow
def add_item(self, *args, **kwargs):
pass

context.deallocations = DummyPendingDeallocs()
return context

@staticmethod
def get_num_devices():
""" Return the number of GPU devices.
Expand All @@ -60,6 +110,12 @@ cdef class Context:
"""
return self.device_number

@property
def handle(self):
""" Return pointer to context handle.
"""
return <uintptr_t>self.context.get().handle()

cdef void init(self, const shared_ptr[CCudaContext]& ctx):
self.context = ctx

Expand Down Expand Up @@ -232,6 +288,15 @@ cdef class CudaBuffer(Buffer):
check_status(CCudaBuffer.FromBuffer(buf_, &cbuf))
return pyarrow_wrap_cudabuffer(cbuf)

def to_numba(self):
"""Return numba memory pointer of CudaBuffer instance.
"""
import ctypes
from numba.cuda.cudadrv.driver import MemoryPointer
return MemoryPointer(self.context.to_numba(),
pointer=ctypes.c_void_p(self.address),
size=self.size)

cdef getitem(self, int64_t i):
return self.copy_to_host(position=i, nbytes=1)[0]

Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow_cuda.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:
CStatus GetContext(int gpu_number, shared_ptr[CCudaContext]* ctx)
# CStatus CreateNewContext(int gpu_number,
# shared_ptr[CCudaContext]* ctx)
CStatus CreateSharedContext(int gpu_number,
void* handle,
shared_ptr[CCudaContext]* ctx)
CStatus AllocateHost(int64_t nbytes,
shared_ptr[CCudaHostBuffer]* buffer)
# CStatus FreeHost(void* data, int64_t nbytes)
Expand All @@ -39,6 +42,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:
CStatus OpenIpcBuffer(const CCudaIpcMemHandle& ipc_handle,
shared_ptr[CCudaBuffer]* buffer)
int64_t bytes_allocated() const
const void* handle() const

cdef cppclass CCudaIpcMemHandle" arrow::gpu::CudaIpcMemHandle":
@staticmethod
Expand Down
126 changes: 126 additions & 0 deletions python/pyarrow/tests/test_cuda_numba_interop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest
import pyarrow as pa
import numpy as np

cuda = pytest.importorskip("pyarrow.cuda")
nb_cuda = pytest.importorskip("numba.cuda")


def make_random_buffer(size, target='host', ctx=None):
"""Return a host or device buffer with random data.
"""
if target == 'host':
assert size >= 0
buf = pa.allocate_buffer(size)
arr = np.frombuffer(buf, dtype=np.uint8)
arr[:] = np.random.randint(low=0, high=255, size=size, dtype=np.uint8)
return arr, buf
elif target == 'device':
arr, buf = make_random_buffer(size, target='host')
dbuf = ctx.new_buffer(size)
dbuf.copy_from_host(buf, position=0, nbytes=size)
return arr, dbuf
raise ValueError('invalid target value')


def test_numba_memalloc():
from numba.cuda.cudadrv.devicearray import DeviceNDArray

# Create context instances
ctx = cuda.Context()
nb_ctx = ctx.to_numba()
assert ctx.handle == nb_ctx.handle.value
assert ctx.handle == nb_cuda.cudadrv.driver.driver.get_context().value

# Dummy data
size = 10
arr, buf = make_random_buffer(size, target='host')

# Allocate memory using numba context
# Warning: this will not be reflected in pyarrow context manager
# (e.g bytes_allocated does not change)
mem = nb_ctx.memalloc(size)
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
darr[:5] = 99
darr[5:] = 88
np.testing.assert_equal(darr.copy_to_host()[:5], 99)
np.testing.assert_equal(darr.copy_to_host()[5:], 88)

# TODO: access the memory via CudaBuffer
address = mem.device_pointer.value
size = mem.size
print('address,size=', address, size)

mem.free()


def test_pyarrow_memalloc():
from numba.cuda.cudadrv.devicearray import DeviceNDArray

ctx = cuda.Context()

size = 10
arr, cbuf = make_random_buffer(size, target='device', ctx=ctx)

# wrap CudaBuffer with numba device array
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
np.testing.assert_equal(darr.copy_to_host(), arr)


def test_numba_context():
from numba.cuda.cudadrv.devicearray import DeviceNDArray

size = 10
with nb_cuda.gpus[0]:
# context is managed by numba
nb_ctx = nb_cuda.cudadrv.devices.get_context()
ctx = cuda.Context.from_numba(nb_ctx)
arr, cbuf = make_random_buffer(size, target='device', ctx=ctx)
assert cbuf.context.handle == nb_ctx.handle.value
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
np.testing.assert_equal(darr.copy_to_host(), arr)
darr[0] = 99
arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=np.uint8)
assert arr2[0] == 99


def test_pyarrow_jit():
from numba.cuda.cudadrv.devicearray import DeviceNDArray

# applying numba.cuda kernel to memory hold by CudaBuffer
ctx = cuda.Context()
size = 10
arr, cbuf = make_random_buffer(size, target='device', ctx=ctx)

@nb_cuda.jit
def increment_by_one(an_array):
pos = nb_cuda.grid(1)
if pos < an_array.size:
an_array[pos] += 1
threadsperblock = 32
blockspergrid = (arr.size + (threadsperblock - 1)) // threadsperblock
mem = cbuf.to_numba()
darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
increment_by_one[blockspergrid, threadsperblock](darr)
np.testing.assert_equal(np.frombuffer(cbuf.copy_to_host(),
dtype=arr.dtype),
arr + 1)

0 comments on commit a403400

Please sign in to comment.