Skip to content

Commit

Permalink
Release the GIL in a most HDFS python methods
Browse files Browse the repository at this point in the history
Change-Id: I88b8ac796a58ea47f0f435ccd4bbfb4b96db5c74
  • Loading branch information
wesm committed Jun 22, 2016
1 parent d00e733 commit a973ee0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 58 deletions.
4 changes: 2 additions & 2 deletions python/pyarrow/error.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
from pyarrow.includes.libarrow cimport CStatus
from pyarrow.includes.pyarrow cimport *

cdef check_cstatus(const CStatus& status)
cdef check_status(const Status& status)
cdef int check_cstatus(const CStatus& status) nogil except -1
cdef int check_status(const Status& status) nogil except -1
14 changes: 8 additions & 6 deletions python/pyarrow/error.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ from pyarrow.compat import frombytes
class ArrowException(Exception):
pass

cdef check_cstatus(const CStatus& status):
cdef int check_cstatus(const CStatus& status) nogil except -1:
if status.ok():
return
return 0

cdef c_string c_message = status.ToString()
raise ArrowException(frombytes(c_message))
with gil:
raise ArrowException(frombytes(c_message))

cdef check_status(const Status& status):
cdef int check_status(const Status& status) nogil except -1:
if status.ok():
return
return 0

cdef c_string c_message = status.ToString()
raise ArrowException(frombytes(c_message))
with gil:
raise ArrowException(frombytes(c_message))
121 changes: 71 additions & 50 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ cdef class HDFSClient:

def close(self):
self._ensure_client()
check_cstatus(self.client.get().Disconnect())
with nogil:
check_cstatus(self.client.get().Disconnect())
self.is_open = False

cdef _ensure_client(self):
Expand Down Expand Up @@ -102,11 +103,15 @@ cdef class HDFSClient:
-------
client : HDFSClient
"""
cdef HDFSClient out = HDFSClient()
cdef:
c_string c_host = tobytes(host)
c_string c_user = tobytes(user)
int c_port = port
HDFSClient out = HDFSClient()

check_cstatus(
CHDFSClient.Connect(tobytes(host), port, tobytes(user),
&out.client))
with nogil:
check_cstatus(
CHDFSClient.Connect(c_host, c_port, c_user, &out.client))
out.is_open = True

return out
Expand All @@ -119,7 +124,10 @@ cdef class HDFSClient:
self._ensure_client()

cdef c_string c_path = tobytes(path)
return self.client.get().Exists(c_path)
cdef c_bool result
with nogil:
result = self.client.get().Exists(c_path)
return result

def ls(self, path, bint full_info=True):
"""
Expand All @@ -144,8 +152,9 @@ cdef class HDFSClient:

self._ensure_client()

check_cstatus(self.client.get()
.ListDirectory(c_path, &sp_listing))
with nogil:
check_cstatus(self.client.get()
.ListDirectory(c_path, &sp_listing))

listing = sp_listing.get()

Expand Down Expand Up @@ -184,10 +193,11 @@ cdef class HDFSClient:
self._ensure_client()

cdef c_string c_path = tobytes(path)
check_cstatus(self.client.get()
.CreateDirectory(c_path))
with nogil:
check_cstatus(self.client.get()
.CreateDirectory(c_path))

def delete(self, path, recursive=False):
def delete(self, path, bint recursive=False):
"""
Delete the indicated file or directory
Expand All @@ -200,8 +210,9 @@ cdef class HDFSClient:
self._ensure_client()

cdef c_string c_path = tobytes(path)
check_cstatus(self.client.get()
.Delete(c_path, recursive))
with nogil:
check_cstatus(self.client.get()
.Delete(c_path, recursive))

def open(self, path, mode='rb', buffer_size=None, replication=None,
default_block_size=None):
Expand All @@ -221,33 +232,33 @@ cdef class HDFSClient:
cdef c_bool append = False

# 0 in libhdfs means "use the default"
buffer_size = buffer_size or 0
cdef int32_t c_buffer_size = buffer_size or 0
cdef int16_t c_replication = replication or 0
cdef int64_t c_default_block_size = default_block_size or 0

if mode in ('wb', 'ab'):
if mode == 'ab':
append = True

replication = replication or 0
default_block_size = default_block_size or 0

check_cstatus(
self.client.get()
.OpenWriteable(c_path, append,
buffer_size, replication,
default_block_size,
&out.wr_file))
with nogil:
check_cstatus(
self.client.get()
.OpenWriteable(c_path, append, c_buffer_size,
c_replication, c_default_block_size,
&out.wr_file))

out.is_readonly = False
else:
check_cstatus(self.client.get()
.OpenReadable(c_path, &out.rd_file))
with nogil:
check_cstatus(self.client.get()
.OpenReadable(c_path, &out.rd_file))
out.is_readonly = True

if buffer_size == 0:
buffer_size = 2 ** 16
if c_buffer_size == 0:
c_buffer_size = 2 ** 16

out.mode = mode
out.buffer_size = buffer_size
out.buffer_size = c_buffer_size
out.parent = self
out.is_open = True

Expand Down Expand Up @@ -297,7 +308,7 @@ cdef class HDFSClient:

writer_thread.join()

def download(self, path, stream, buffer_size=2**16):
def download(self, path, stream, buffer_size=None):
f = self.open(path, 'rb', buffer_size=buffer_size)
f.download(stream)

Expand All @@ -322,10 +333,11 @@ cdef class HDFSFile:
self.close()

def close(self):
if self.is_readonly:
check_cstatus(self.rd_file.get().Close())
else:
check_cstatus(self.wr_file.get().Close())
with nogil:
if self.is_readonly:
check_cstatus(self.rd_file.get().Close())
else:
check_cstatus(self.wr_file.get().Close())
self.is_open = False

cdef _assert_readable(self):
Expand All @@ -338,15 +350,17 @@ cdef class HDFSFile:

def tell(self):
cdef int64_t position
if self.is_readonly:
check_cstatus(self.rd_file.get().Tell(&position))
else:
check_cstatus(self.wr_file.get().Tell(&position))
with nogil:
if self.is_readonly:
check_cstatus(self.rd_file.get().Tell(&position))
else:
check_cstatus(self.wr_file.get().Tell(&position))
return position

def seek(self, position):
def seek(self, int64_t position):
self._assert_readable()
check_cstatus(self.rd_file.get().Seek(position))
with nogil:
check_cstatus(self.rd_file.get().Seek(position))

def read(self, int nbytes):
"""
Expand All @@ -369,16 +383,17 @@ cdef class HDFSFile:
cdef int rpc_chunksize = min(self.buffer_size, nbytes)

try:
while total_bytes < nbytes:
check_cstatus(self.rd_file.get()
.Read(rpc_chunksize, &bytes_read,
buf + total_bytes))
with nogil:
while total_bytes < nbytes:
check_cstatus(self.rd_file.get()
.Read(rpc_chunksize, &bytes_read,
buf + total_bytes))

total_bytes += bytes_read
total_bytes += bytes_read

# EOF
if bytes_read == 0:
break
# EOF
if bytes_read == 0:
break
result = cp.PyBytes_FromStringAndSize(<const char*>buf,
total_bytes)
finally:
Expand Down Expand Up @@ -440,9 +455,13 @@ cdef class HDFSFile:
cdef int64_t total_bytes = 0

try:
i = 0
while True:
check_cstatus(self.rd_file.get()
.Read(self.buffer_size, &bytes_read, buf))
with nogil:
check_cstatus(self.rd_file.get()
.Read(self.buffer_size, &bytes_read, buf))

i += 1

total_bytes += bytes_read

Expand Down Expand Up @@ -473,4 +492,6 @@ cdef class HDFSFile:
data = tobytes(data)

cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
check_cstatus(self.wr_file.get().Write(buf, len(data)))
cdef int32_t bufsize = len(data)
with nogil:
check_cstatus(self.wr_file.get().Write(buf, bufsize))

0 comments on commit a973ee0

Please sign in to comment.