Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions tensorboard/compat/tensorflow_stub/io/gfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,40 @@ def join(self, path, *paths):
"""Join paths with path delimiter."""
return os.path.join(path, *paths)

def read(self, filename, binary_mode=False, size=None, offset=None):
def read(self, filename, binary_mode=False, size=None, continue_from=None):
"""Reads contents of a file to a string.

Args:
filename: string, a path
binary_mode: bool, read as binary if True, otherwise text
size: int, number of bytes or characters to read, otherwise
read all the contents of the file from the offset
offset: int, offset into file to read from, otherwise read
from the very beginning
read all the contents of the file (from the continuation
marker, if present).
continue_from: An opaque value returned from a prior invocation of
`read(...)` marking the last read position, so that reading
may continue from there. Otherwise read from the beginning.

Returns:
Subset of the contents of the file as a string or bytes.
A tuple of `(data, continuation_token)` where `data' provides either
bytes read from the file (if `binary_mode == true`) or the decoded
string representation thereof (otherwise), and `continuation_token`
is an opaque value that can be passed to the next invocation of
`read(...) ' in order to continue from the last read position.
"""
mode = "rb" if binary_mode else "r"
encoding = None if binary_mode else "utf8"
offset = None
if continue_from is not None:
offset = continue_from.get("opaque_offset", None)
with io.open(filename, mode, encoding=encoding) as f:
if offset is not None:
f.seek(offset)
if size is not None:
return f.read(size)
else:
return f.read()
data = f.read(size)
# The new offset may not be `offset + len(data)`, due to decoding
# and newline translation.
# So, just measure it in whatever terms the underlying stream uses.
continuation_token = {"opaque_offset": f.tell()}
return (data, continuation_token)

def write(self, filename, file_content, binary_mode=False):
"""Writes string file contents to a file, overwriting any
Expand Down Expand Up @@ -186,10 +197,10 @@ def stat(self, filename):
# NOTE: Size of the file is given by .st_size as returned from
# os.stat(), but we convert to .length
try:
len = os.stat(compat.as_bytes(filename)).st_size
file_length = os.stat(compat.as_bytes(filename)).st_size
except OSError:
raise errors.NotFoundError(None, None, "Could not find file")
return StatData(len)
return StatData(file_length)


class S3FileSystem(object):
Expand Down Expand Up @@ -222,31 +233,47 @@ def join(self, path, *paths):
"""Join paths with a slash."""
return "/".join((path,) + paths)

def read(self, filename, binary_mode=False, size=None, offset=None):
def read(self, filename, binary_mode=False, size=None, continue_from=None):
"""Reads contents of a file to a string.

Args:
filename: string, a path
binary_mode: bool, read as binary if True, otherwise text
size: int, number of bytes or characters to read, otherwise
read all the contents of the file from the offset
offset: int, offset into file to read from, otherwise read
from the very beginning
read all the contents of the file (from the continuation
marker, if present).
continue_from: An opaque value returned from a prior invocation of
`read(...)` marking the last read position, so that reading
may continue from there. Otherwise read from the beginning.

Returns:
Subset of the contents of the file as a string or bytes.
A tuple of `(data, continuation_token)` where `data' provides either
bytes read from the file (if `binary_mode == true`) or the decoded
string representation thereof (otherwise), and `continuation_token`
is an opaque value that can be passed to the next invocation of
`read(...) ' in order to continue from the last read position.
"""
s3 = boto3.resource("s3")
bucket, path = self.bucket_and_path(filename)
args = {}
endpoint = 0
if size is not None or offset is not None:
if offset is None:
offset = 0
endpoint = '' if size is None else (offset + size)
if offset != 0 or endpoint != '':
# Asked for a range, so modify the request
args['Range'] = 'bytes={}-{}'.format(offset, endpoint)

# For the S3 case, we use continuation tokens of the form
# {byte_offset: number}
offset = 0
if continue_from is not None:
offset = continue_from.get("byte_offset", 0)

endpoint = ''
if size is not None:
# TODO(orionr): This endpoint risks splitting a multi-byte
# character or splitting \r and \n in the case of CRLFs,
# producing decoding errors below.
endpoint = offset + size

if offset != 0 or endpoint != '':
# Asked for a range, so modify the request
args['Range'] = 'bytes={}-{}'.format(offset, endpoint)

try:
stream = s3.Object(bucket, path).get(**args)['Body'].read()
except botocore.exceptions.ClientError as exc:
Expand All @@ -256,8 +283,8 @@ def read(self, filename, binary_mode=False, size=None, offset=None):
# in a second request so we don't check length in all cases.
client = boto3.client("s3")
obj = client.head_object(Bucket=bucket, Key=path)
len = obj['ContentLength']
endpoint = min(len, offset + size)
content_length = obj['ContentLength']
endpoint = min(content_length, offset + size)
if offset == endpoint:
# Asked for no bytes, so just return empty
stream = b''
Expand All @@ -266,10 +293,14 @@ def read(self, filename, binary_mode=False, size=None, offset=None):
stream = s3.Object(bucket, path).get(**args)['Body'].read()
else:
raise
# `stream` should contain raw bytes here (i.e., there has been neither
# decoding nor newline translation), so the byte offset increases by
# the expected amount.
continuation_token = {'byte_offset': (offset + len(stream))}
if binary_mode:
return bytes(stream)
return (bytes(stream), continuation_token)
else:
return stream.decode('utf-8')
return (stream.decode('utf-8'), continuation_token)

def write(self, filename, file_content, binary_mode=False):
"""Writes string file contents to a file.
Expand Down Expand Up @@ -384,10 +415,13 @@ def __init__(self, filename, mode):
self.filename = compat.as_bytes(filename)
self.fs = get_filesystem(self.filename)
self.fs_supports_append = hasattr(self.fs, 'append')
self.buff_chunk_size = _DEFAULT_BLOCK_SIZE
self.buff = None
# The buffer offset and the buffer chunk size are measured in the
# natural units of the underlying stream, i.e. bytes for binary mode,
# or characters in text mode.
self.buff_chunk_size = _DEFAULT_BLOCK_SIZE
self.buff_offset = 0
self.offset = 0
self.continuation_token = None
self.write_temp = None
self.write_started = False
self.binary_mode = 'b' in mode
Expand All @@ -401,15 +435,14 @@ def __exit__(self, *args):
self.close()
self.buff = None
self.buff_offset = 0
self.offset = 0
self.continuation_token = None

def __iter__(self):
return self

def _read_buffer_to_offset(self, new_buff_offset):
old_buff_offset = self.buff_offset
read_size = min(len(self.buff), new_buff_offset) - old_buff_offset
self.offset += read_size
self.buff_offset += read_size
return self.buff[old_buff_offset:old_buff_offset + read_size]

Expand Down Expand Up @@ -438,8 +471,8 @@ def read(self, n=None):

# read from filesystem
read_size = max(self.buff_chunk_size, n) if n is not None else None
self.buff = self.fs.read(self.filename, self.binary_mode,
read_size, self.offset)
(self.buff, self.continuation_token) = self.fs.read(
self.filename, self.binary_mode, read_size, self.continuation_token)
self.buff_offset = 0

# add from filesystem
Expand Down
20 changes: 15 additions & 5 deletions tensorboard/compat/tensorflow_stub/io/gfile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,26 @@ def testReadLines(self):
temp_dir = self.get_temp_dir()
self._CreateDeepDirectoryStructure(temp_dir)
ckpt_path = os.path.join(temp_dir, 'model.ckpt')
ckpt_lines = (

# Note \r\n, which io.read will automatically replace with \n.
# That substitution desynchronizes character offsets (omitting \r) from
# the underlying byte offsets (counting \r). Multibyte characters would
# similarly cause desynchronization.
raw_ckpt_lines = (
[u'\r\n'] + [u'line {}\r\n'.format(i) for i in range(10)] + [u' ']
)
expected_ckpt_lines = ( # without \r
[u'\n'] + [u'line {}\n'.format(i) for i in range(10)] + [u' ']
)
# Write out \n as newline even on Windows
# Write out newlines as given (i.e., \r\n) regardless of OS, so as to
# test translation on read.
with io.open(ckpt_path, 'w', newline='') as f:
f.write(u''.join(ckpt_lines))
data = u''.join(raw_ckpt_lines)
f.write(data)
with gfile.GFile(ckpt_path, 'r') as f:
f.buff_chunk_size = 4 # Test buffering by reducing chunk size
ckpt_read_lines = list(f)
self.assertEqual(ckpt_lines, ckpt_read_lines)
read_ckpt_lines = list(f)
self.assertEqual(expected_ckpt_lines, read_ckpt_lines)

def testReadWithOffset(self):
temp_dir = self.get_temp_dir()
Expand Down