Skip to content

Commit

Permalink
Implement HDFSClient.upload and HDFSClient.download to streams
Browse files Browse the repository at this point in the history
  • Loading branch information
wesm committed Jun 21, 2016
1 parent eaad768 commit 0a564b6
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 37 deletions.
107 changes: 93 additions & 14 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ from pyarrow.includes.libarrow_io cimport *
from pyarrow.compat import frombytes, tobytes
from pyarrow.error cimport check_cstatus

cimport cpython as cp

import re
import threading

_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')


def fifo_queue():
try:
# Python 3
Expand Down Expand Up @@ -69,12 +73,15 @@ cdef class HDFSClient:
self.close()

def close(self):
self._ensure_client()
check_cstatus(self.client.get().Disconnect())
self.is_open = False

cdef _ensure_client(self):
if self.client.get() == NULL:
raise Exception('HDFS client improperly initialized')
raise IOError('HDFS client improperly initialized')
elif not self.is_open:
raise IOError('HDFS client is closed')

@classmethod
def connect(cls, host, port, user):
Expand Down Expand Up @@ -109,6 +116,8 @@ cdef class HDFSClient:
Returns True if the path is known to the cluster, False if it does not
(or there is an RPC error)
"""
self._ensure_client()

cdef c_string c_path = tobytes(path)
return self.client.get().Exists(c_path)

Expand All @@ -133,6 +142,8 @@ cdef class HDFSClient:
list results = []
int i

self._ensure_client()

check_cstatus(self.client.get()
.ListDirectory(c_path, &sp_listing))

Expand Down Expand Up @@ -170,11 +181,13 @@ cdef class HDFSClient:
"""
Create indicated directory and any necessary parent directories
"""
self._ensure_client()

cdef c_string c_path = tobytes(path)
check_cstatus(self.client.get()
.CreateDirectory(c_path))

def rm(self, path, recursive=False):
def delete(self, path, recursive=False):
"""
Delete the indicated file or directory
Expand All @@ -184,6 +197,8 @@ cdef class HDFSClient:
recursive : boolean, default False
If True, also delete child paths for directories
"""
self._ensure_client()

cdef c_string c_path = tobytes(path)
check_cstatus(self.client.get()
.Delete(c_path, recursive))
Expand All @@ -195,6 +210,8 @@ cdef class HDFSClient:
----------
mode : string, 'rb', 'wb', 'ab'
"""
self._ensure_client()

cdef HDFSFile out = HDFSFile()

if mode not in ('rb', 'wb', 'ab'):
Expand Down Expand Up @@ -236,6 +253,54 @@ cdef class HDFSClient:

return out

def upload(self, path, stream, buffer_size=2**16):
"""
Upload file-like object to HDFS path
"""
cv = threading.Condition()
write_queue = fifo_queue()

f = self.open(path, 'wb')

done = False
def bg_write():
while not done:
# Wait for more work
if write_queue.qsize() == 0:
with cv:
cv.wait()

# Awoken, but done
if done and write_queue.qsize() == 0:
break

while write_queue.qsize() > 0:
buf = write_queue.get()
f.write(buf)

writer_thread = threading.Thread(target=bg_write)
writer_thread.start()

try:
while True:
buf = stream.read(buffer_size)
if not buf:
break

write_queue.put(buf)
with cv:
cv.notify()
finally:
done = True
with cv:
cv.notify()

writer_thread.join()

def download(self, path, stream, buffer_size=2**16):
f = self.open(path, 'rb', buffer_size=buffer_size)
f.download(stream)


cdef class HDFSFile:
cdef:
Expand Down Expand Up @@ -314,14 +379,14 @@ cdef class HDFSFile:
# EOF
if bytes_read == 0:
break
result = cpython.PyBytes_FromStringAndSize(<const char*>buf,
total_bytes)
result = cp.PyBytes_FromStringAndSize(<const char*>buf,
total_bytes)
finally:
free(buf)

return result

def download(self, local_path):
def download(self, stream_or_path):
"""
Read file completely to local path (rather than reading completely into
memory). First seeks to the beginning of the file.
Expand All @@ -331,14 +396,18 @@ cdef class HDFSFile:
uint8_t* buf
self._assert_readable()

from threading import Thread, Condition

cv = Condition()
cv = threading.Condition()
write_queue = fifo_queue()

done = False
def bg_write():
with open(local_path, 'wb') as f:
if not hasattr(stream_or_path, 'read'):
stream = open(stream_or_path, 'wb')
cleanup = lambda: stream.close()
else:
stream = stream_or_path
cleanup = lambda: None
try:
while not done:
# Wait for more work
if write_queue.qsize() == 0:
Expand All @@ -351,11 +420,13 @@ cdef class HDFSFile:

while write_queue.qsize() > 0:
buf = write_queue.get()
f.write(buf)
stream.write(buf)
finally:
cleanup()

self.seek(0)

writer_thread = Thread(target=bg_write)
writer_thread = threading.Thread(target=bg_write)

# This isn't ideal -- PyBytes_FromStringAndSize copies the data from
# the passed buffer, so it's hard for us to avoid doubling the memory
Expand All @@ -379,19 +450,27 @@ cdef class HDFSFile:
if bytes_read == 0:
break

pybuf = cpython.PyBytes_FromStringAndSize(
<const char*>buf, bytes_read)
pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
bytes_read)

write_queue.put(pybuf)
with cv:
cv.notify()
finally:
free(buf)
done = True
with cv:
cv.notify()
free(buf)

writer_thread.join()

def write(self, data):
"""
Write bytes-like (unicode, encoded to UTF-8) to file
"""
self._assert_writeable()

data = tobytes(data)

cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
check_cstatus(self.wr_file.get().Write(buf, len(data)))
47 changes: 24 additions & 23 deletions python/pyarrow/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,55 @@
# specific language governing permissions and limitations
# under the License.

from pyarrow.compat import unittest
import pyarrow
import pyarrow.formatting as fmt


class TestArrayAPI(unittest.TestCase):
def test_repr_on_pre_init_array():
arr = pyarrow.array.Array()
assert len(repr(arr)) > 0

def test_repr_on_pre_init_array(self):
arr = pyarrow.array.Array()
assert len(repr(arr)) > 0

def test_getitem_NA(self):
arr = pyarrow.from_pylist([1, None, 2])
assert arr[1] is pyarrow.NA
def test_getitem_NA():
arr = pyarrow.from_pylist([1, None, 2])
assert arr[1] is pyarrow.NA

def test_list_format(self):
arr = pyarrow.from_pylist([[1], None, [2, 3, None]])
result = fmt.array_format(arr)
expected = """\

def test_list_format():
arr = pyarrow.from_pylist([[1], None, [2, 3, None]])
result = fmt.array_format(arr)
expected = """\
[
[1],
NA,
[2,
3,
NA]
]"""
assert result == expected
assert result == expected


def test_string_format(self):
arr = pyarrow.from_pylist(['', None, 'foo'])
result = fmt.array_format(arr)
expected = """\
def test_string_format():
arr = pyarrow.from_pylist(['', None, 'foo'])
result = fmt.array_format(arr)
expected = """\
[
'',
NA,
'foo'
]"""
assert result == expected
assert result == expected


def test_long_array_format(self):
arr = pyarrow.from_pylist(range(100))
result = fmt.array_format(arr, window=2)
expected = """\
def test_long_array_format():
arr = pyarrow.from_pylist(range(100))
result = fmt.array_format(arr, window=2)
expected = """\
[
0,
1,
...
98,
99
]"""
assert result == expected
assert result == expected

0 comments on commit 0a564b6

Please sign in to comment.