Skip to content

Commit

Permalink
Implement whence argument for pyarrow.NativeFile.seek
Browse files Browse the repository at this point in the history
Change-Id: I27045ae9d34c57e40591959b4541a3d60463c458
  • Loading branch information
wesm committed Jul 29, 2017
1 parent 3b14765 commit b057fc2
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 12 deletions.
58 changes: 56 additions & 2 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,19 @@ cdef class NativeFile:
raise IOError("file not open")

def size(self):
"""
Return file size
"""
cdef int64_t size
self._assert_readable()
with nogil:
check_status(self.rd_file.get().GetSize(&size))
return size

def tell(self):
"""
Return current stream position
"""
cdef int64_t position
with nogil:
if self.is_readable:
Expand All @@ -121,10 +127,46 @@ cdef class NativeFile:
check_status(self.wr_file.get().Tell(&position))
return position

def seek(self, int64_t position):
def seek(self, int64_t position, int whence=0):
"""
Change current file stream position
Parameters
----------
position : int
Byte offset, interpreted relative to value of whence argument
whence : int, default 0
Point of reference for seek offset
Notes
-----
Values of whence:
* 0 -- start of stream (the default); offset should be zero or positive
* 1 -- current stream position; offset may be negative
* 2 -- end of stream; offset is usually negative
Returns
-------
new_position : the new absolute stream position
"""
cdef int64_t offset
self._assert_readable()
with nogil:
check_status(self.rd_file.get().Seek(position))
if whence == 0:
offset = position
elif whence == 1:
check_status(self.rd_file.get().Tell(&offset))
offset = offset + position
elif whence == 2:
check_status(self.rd_file.get().GetSize(&offset))
offset = offset + position
else:
with gil:
raise ValueError("Invalid value of whence: {0}"
.format(whence))
check_status(self.rd_file.get().Seek(offset))

return self.tell()

def write(self, data):
"""
Expand All @@ -144,6 +186,18 @@ cdef class NativeFile:
check_status(self.wr_file.get().Write(buf, bufsize))

def read(self, nbytes=None):
"""
Read indicated number of bytes from file, or read all remaining bytes
if no argument passed
Parameters
----------
nbytes : int, default None
Returns
-------
data : bytes
"""
cdef:
int64_t c_nbytes
int64_t bytes_read = 0
Expand Down
4 changes: 3 additions & 1 deletion python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
except ImportError:
pass


try:
import pyarrow.plasma as plasma
import pyarrow.plasma as plasma # noqa
defaults['plasma'] = True
except ImportError:
pass


def pytest_configure(config):
pass

Expand Down
9 changes: 9 additions & 0 deletions python/pyarrow/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ def _check_native_file_reader(FACTORY, sample_data):
assert f.tell() == len(data) + 1
assert f.read(5) == b''

# Test whence argument of seek, ARROW-1287
assert f.seek(3) == 3
assert f.seek(3, os.SEEK_CUR) == 6
assert f.tell() == 6

ex_length = len(data) - 2
assert f.seek(-2, os.SEEK_END) == ex_length
assert f.tell() == ex_length


def test_memory_map_reader(sample_disk_data):
_check_native_file_reader(pa.memory_map, sample_disk_data)
Expand Down
15 changes: 6 additions & 9 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
from __future__ import division
from __future__ import print_function

import glob
import numpy as np
import os
import pytest
import random
import signal
import subprocess
import sys
import time
import unittest

import pyarrow as pa
import pandas as pd

DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9


def random_name():
return str(random.randint(0, 99999999))

Expand Down Expand Up @@ -160,7 +158,7 @@ def setup_method(self, test_method):

def teardown_method(self, test_method):
# Check that the Plasma store is still alive.
assert self.p.poll() == None
assert self.p.poll() is None
# Kill the plasma store process.
if os.getenv("PLASMA_VALGRIND") == "1":
self.p.send_signal(signal.SIGTERM)
Expand Down Expand Up @@ -227,7 +225,7 @@ def test_create_existing(self):
self.plasma_client.create(object_id, length,
generate_metadata(length))
# TODO(pcm): Introduce a more specific error type here.
except pa.lib.ArrowException as e:
except pa.lib.ArrowException:
pass
else:
assert False
Expand Down Expand Up @@ -270,7 +268,6 @@ def test_get(self):
assert results[i] is None

def test_store_arrow_objects(self):
import pyarrow.plasma as plasma
data = np.random.randn(10, 4)
# Write an arrow object.
object_id = random_object_id()
Expand Down Expand Up @@ -334,7 +331,7 @@ def assert_create_raises_plasma_full(unit_test, size):
partial_size,
size - partial_size)
# TODO(pcm): More specific error here.
except pa.lib.ArrowException as e:
except pa.lib.ArrowException:
pass
else:
# For some reason the above didn't throw an exception, so fail.
Expand Down Expand Up @@ -368,7 +365,7 @@ def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
real_object_ids = [random_object_id() for _ in range(100)]
for object_id in real_object_ids:
assert self.plasma_client.contains(object_id) == False
assert self.plasma_client.contains(object_id) is False
self.plasma_client.create(object_id, 100)
self.plasma_client.seal(object_id)
assert self.plasma_client.contains(object_id)
Expand All @@ -383,7 +380,7 @@ def test_hash(self):
try:
self.plasma_client.hash(object_id1)
# TODO(pcm): Introduce a more specific error type here
except pa.lib.ArrowException as e:
except pa.lib.ArrowException:
pass
else:
assert False
Expand Down

0 comments on commit b057fc2

Please sign in to comment.