diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c7e865e367..8b465329b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -110,7 +110,9 @@ jobs: - name: 'Bazel: run manual tests' run: | bazel test //tensorboard/compat/tensorflow_stub:gfile_s3_test && - bazel test //tensorboard/summary/writer:event_file_writer_s3_test + bazel test //tensorboard/summary/writer:event_file_writer_s3_test && + bazel test //tensorboard/compat/tensorflow_stub:gfile_fsspec_test && + bazel test //tensorboard/summary/writer:event_file_writer_fsspec_test build-data-server-pip: runs-on: ${{ matrix.platform }} diff --git a/tensorboard/BUILD b/tensorboard/BUILD index c874d8e02c..e901ba59c3 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -488,6 +488,13 @@ py_library(name = "expect_requests_installed") # optional dependency. py_library(name = "expect_pandas_installed") +# This is a dummy rule used as a fsspec dependency in open-source. +# We expect fsspec to already be installed on the system, e.g. via +# `pip install fsspec`. +# NOTE: Unlike other parallel dependencies in this file, fsspec is an +# optional dependency. +py_library(name = "expect_fsspec_installed") + py_library( name = "data_compat", srcs = ["data_compat.py"], diff --git a/tensorboard/compat/tensorflow_stub/BUILD b/tensorboard/compat/tensorflow_stub/BUILD index aeb66c8b98..562d68c7bd 100644 --- a/tensorboard/compat/tensorflow_stub/BUILD +++ b/tensorboard/compat/tensorflow_stub/BUILD @@ -16,6 +16,7 @@ py_library( srcs_version = "PY3", deps = [ "//tensorboard:expect_absl_flags_installed", + "//tensorboard:expect_fsspec_installed", "//tensorboard:expect_numpy_installed", "//tensorboard/compat/proto:protos_all_py_pb2", ], @@ -59,3 +60,16 @@ py_test( "//tensorboard:test", ], ) + +py_test( + name = "gfile_fsspec_test", + size = "small", + srcs = ["io/gfile_fsspec_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":tensorflow_stub", + "//tensorboard:expect_fsspec_installed", + "//tensorboard:test", + ], +) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 15e6a82f3a..bb6293f26d 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -24,6 +24,7 @@ import glob as py_glob import io import os +import os.path import sys import tempfile @@ -35,6 +36,13 @@ except ImportError: S3_ENABLED = False +try: + import fsspec + + FSSPEC_ENABLED = True +except ImportError: + FSSPEC_ENABLED = False + if sys.version_info < (3, 0): # In Python 2 FileExistsError is not defined and the # error manifests it as OSError. @@ -69,6 +77,8 @@ def get_filesystem(filename): if index >= 0: prefix = filename[:index] fs = _REGISTERED_FILESYSTEMS.get(prefix, None) + if fs is None: + fs = _get_fsspec_filesystem(filename) if fs is None: raise ValueError("No recognized filesystem for prefix %s" % prefix) return fs @@ -401,6 +411,242 @@ def stat(self, filename): raise +class FSSpecFileSystem(object): + """Provides filesystem access via fsspec. + + The current gfile interface doesn't map perfectly to the fsspec interface + leading to some notable inefficiencies. + + * Reads and writes to files cause the file to be reopened each time which + can cause a performance hit when accessing local file systems. + * walk doesn't use the native fsspec walk function so performance may be + slower. + + See https://github.com/tensorflow/tensorboard/issues/5286 for more info on + limitations. + """ + + SEPARATOR = "://" + CHAIN_SEPARATOR = "::" + + def _validate_path(self, path): + parts = path.split(self.CHAIN_SEPARATOR) + for part in parts[:-1]: + if self.SEPARATOR in part: + raise errors.InvalidArgumentError( + None, + None, + "fsspec URL must only have paths in the last chained filesystem, got {}".format( + path + ), + ) + + def _translate_errors(func): + def func_wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except FileNotFoundError as e: + raise errors.NotFoundError(None, None, str(e)) + + return func_wrapper + + def _fs_path(self, filename): + if isinstance(filename, bytes): + filename = filename.decode("utf-8") + self._validate_path(filename) + + fs, path = fsspec.core.url_to_fs(filename) + return fs, path + + @_translate_errors + def exists(self, filename): + """Determines whether a path exists or not.""" + fs, path = self._fs_path(filename) + return fs.exists(path) + + def _join(self, sep, paths): + """ + _join joins the paths with the given separator. + """ + result = [] + for part in paths: + if part.startswith(sep): + result = [] + if result and result[-1] and not result[-1].endswith(sep): + result.append(sep) + result.append(part) + return "".join(result) + + @_translate_errors + def join(self, path, *paths): + """Join paths with a slash.""" + self._validate_path(path) + + before, sep, last_path = path.rpartition(self.CHAIN_SEPARATOR) + chain_prefix = before + sep + protocol, path = fsspec.core.split_protocol(last_path) + fs = fsspec.get_filesystem_class(protocol) + if protocol: + chain_prefix += protocol + self.SEPARATOR + return chain_prefix + self._join(fs.sep, ((path,) + paths)) + + @_translate_errors + def read(self, filename, binary_mode=False, size=None, continue_from=None): + """Reads contents of a file to a string. + + Args: + filename: string, a path + binary_mode: bool, read as binary if True, otherwise text + size: int, number of bytes or characters to read, otherwise + read all the contents of the file (from the continuation + marker, if present). + continue_from: An opaque value returned from a prior invocation of + `read(...)` marking the last read position, so that reading + may continue from there. Otherwise read from the beginning. + + Returns: + A tuple of `(data, continuation_token)` where `data' provides either + bytes read from the file (if `binary_mode == true`) or the decoded + string representation thereof (otherwise), and `continuation_token` + is an opaque value that can be passed to the next invocation of + `read(...) ' in order to continue from the last read position. + """ + fs, path = self._fs_path(filename) + + mode = "rb" if binary_mode else "r" + encoding = None if binary_mode else "utf8" + if not exists(filename): + raise errors.NotFoundError( + None, None, "Not Found: " + compat.as_text(filename) + ) + with fs.open(path, mode, encoding=encoding) as f: + if continue_from is not None: + if not f.seekable(): + raise errors.InvalidArgumentError( + None, + None, + "{} is not seekable".format(filename), + ) + offset = continue_from.get("opaque_offset", None) + if offset is not None: + f.seek(offset) + + data = f.read(size) + # The new offset may not be `offset + len(data)`, due to decoding + # and newline translation. + # So, just measure it in whatever terms the underlying stream uses. + continuation_token = ( + {"opaque_offset": f.tell()} if f.seekable() else {} + ) + return (data, continuation_token) + + @_translate_errors + def write(self, filename, file_content, binary_mode=False): + """Writes string file contents to a file. + + Args: + filename: string, a path + file_content: string, the contents + binary_mode: bool, write as binary if True, otherwise text + """ + self._write(filename, file_content, "wb" if binary_mode else "w") + + @_translate_errors + def append(self, filename, file_content, binary_mode=False): + """Append string file contents to a file. + + Args: + filename: string, a path + file_content: string, the contents to append + binary_mode: bool, write as binary if True, otherwise text + """ + self._write(filename, file_content, "ab" if binary_mode else "a") + + def _write(self, filename, file_content, mode): + fs, path = self._fs_path(filename) + encoding = None if "b" in mode else "utf8" + with fs.open(path, mode, encoding=encoding) as f: + compatify = compat.as_bytes if "b" in mode else compat.as_text + f.write(compatify(file_content)) + + def _get_chain_protocol_prefix(self, filename): + chain_prefix, chain_sep, last_path = filename.rpartition( + self.CHAIN_SEPARATOR + ) + protocol, sep, _ = last_path.rpartition(self.SEPARATOR) + return chain_prefix + chain_sep + protocol + sep + + @_translate_errors + def glob(self, filename): + """Returns a list of files that match the given pattern(s).""" + if isinstance(filename, bytes): + filename = filename.decode("utf-8") + + fs, path = self._fs_path(filename) + files = fs.glob(path) + + # check if applying the original chaining is required. + if ( + self.SEPARATOR not in filename + and self.CHAIN_SEPARATOR not in filename + ): + return files + + prefix = self._get_chain_protocol_prefix(filename) + + return [ + file + if (self.SEPARATOR in file or self.CHAIN_SEPARATOR in file) + else prefix + file + for file in files + ] + + @_translate_errors + def isdir(self, dirname): + """Returns whether the path is a directory or not.""" + fs, path = self._fs_path(dirname) + return fs.isdir(path) + + @_translate_errors + def listdir(self, dirname): + """Returns a list of entries contained within a directory.""" + fs, path = self._fs_path(dirname) + files = fs.listdir(path, detail=False) + files = [os.path.basename(fname) for fname in files] + return files + + @_translate_errors + def makedirs(self, dirname): + """Creates a directory and all parent/intermediate directories.""" + fs, path = self._fs_path(dirname) + return fs.makedirs(path, exist_ok=True) + + @_translate_errors + def stat(self, filename): + """Returns file statistics for a given path.""" + fs, path = self._fs_path(filename) + return StatData(fs.size(path)) + + +_FSSPEC_FILESYSTEM = FSSpecFileSystem() + + +def _get_fsspec_filesystem(filename): + """ + _get_fsspec_filesystem checks if the provided protocol is known to fsspec + and if so returns the filesystem wrapper for it. + """ + if not FSSPEC_ENABLED: + return None + + segment = filename.partition(FSSpecFileSystem.CHAIN_SEPARATOR)[0] + protocol = segment.partition(FSSpecFileSystem.SEPARATOR)[0] + if fsspec.get_filesystem_class(protocol): + return _FSSPEC_FILESYSTEM + else: + return None + + register_filesystem("", LocalFileSystem()) if S3_ENABLED: register_filesystem("s3", S3FileSystem()) @@ -514,6 +760,7 @@ def write(self, file_content): # write the first chunk to truncate file if it already exists self.fs.write(self.filename, file_content, self.binary_mode) self.write_started = True + else: # append the later chunks self.fs.append(self.filename, file_content, self.binary_mode) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_fsspec_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_fsspec_test.py new file mode 100644 index 0000000000..cd519eab48 --- /dev/null +++ b/tensorboard/compat/tensorflow_stub/io/gfile_fsspec_test.py @@ -0,0 +1,557 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + + +import posixpath + +from tensorboard import test as tb_test +from tensorboard.compat.tensorflow_stub import errors +from tensorboard.compat.tensorflow_stub.io import gfile + +import fsspec + + +class GFileFSSpecTest(tb_test.TestCase): + def get_temp_dir(self): + return "file://" + super().get_temp_dir() + + def testExists(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model.ckpt") + self.assertTrue(gfile.exists(temp_dir)) + self.assertTrue(gfile.exists(ckpt_path)) + + def testGlob(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + "foo", + "bar", + "a.tfevents.1", + "model.ckpt", + "quuz", + "waldo", + ] + expected_listing = [posixpath.join(temp_dir, f) for f in expected] + gotten_listing = gfile.glob(posixpath.join(temp_dir, "*")) + self.assertCountEqual( + expected_listing, + gotten_listing, + "Files must match. Expected %r. Got %r." + % (expected_listing, gotten_listing), + ) + + def testIsdir(self): + temp_dir = self.get_temp_dir() + self.assertTrue(gfile.isdir(temp_dir)) + + def testListdir(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + expected_files = ( + "foo", + "bar", + "quuz", + "a.tfevents.1", + "model.ckpt", + "waldo", + ) + got = gfile.listdir(temp_dir) + self.assertCountEqual(expected_files, got) + + def testMakeDirs(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + new_dir = posixpath.join(temp_dir, "newdir", "subdir", "subsubdir") + gfile.makedirs(new_dir) + self.assertTrue(gfile.isdir(new_dir)) + + def testMakeDirsAlreadyExists(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + new_dir = posixpath.join(temp_dir, "bar", "baz") + gfile.makedirs(new_dir) + + def testWalk(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + [ + "", + [ + "a.tfevents.1", + "model.ckpt", + ], + ], + ["foo", []], + [ + "bar", + [ + "b.tfevents.1", + "red_herring.txt", + ], + ], + [ + "bar/baz", + [ + "c.tfevents.1", + "d.tfevents.1", + ], + ], + [ + "bar/quux", + [ + "some_flume_output.txt", + "some_more_flume_output.txt", + ], + ], + [ + "quuz", + [ + "e.tfevents.1", + ], + ], + [ + "quuz/garply", + [ + "f.tfevents.1", + ], + ], + [ + "quuz/garply/corge", + [ + "g.tfevents.1", + ], + ], + [ + "quuz/garply/grault", + [ + "h.tfevents.1", + ], + ], + ["waldo", []], + [ + "waldo/fred", + [ + "i.tfevents.1", + ], + ], + ] + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = ( + posixpath.join(temp_dir, pair[0].replace("/", posixpath.sep)) + if pair[0] + else temp_dir + ) + gotten = gfile.walk(temp_dir) + self._CompareFilesPerSubdirectory(expected, gotten) + + def testStat(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" + with fsspec.open(ckpt_path, "w") as f: + f.write(ckpt_content) + ckpt_stat = gfile.stat(ckpt_path) + self.assertEqual(ckpt_stat.length, len(ckpt_content)) + bad_ckpt_path = posixpath.join(temp_dir, "bad_model.ckpt") + with self.assertRaises(errors.NotFoundError): + gfile.stat(bad_ckpt_path) + + def testRead(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" + with fsspec.open(ckpt_path, "w") as f: + f.write(ckpt_content) + with gfile.GFile(ckpt_path, "r") as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testTextMode(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model.ckpt") + + # Write out newlines as given (i.e., \r\n) regardless of OS, so as to + # test translation on read. + with fsspec.open(ckpt_path, "w", newline="") as f: + data = "asdf\nasdf\nasdf\n" + f.write(data) + with gfile.GFile(ckpt_path, "r") as f: + f.buff_chunk_size = 6 # Test buffering by reducing chunk size + f.read() + # TODO (d4l3k): test seeking behavior once + # https://github.com/intake/filesystem_spec/pull/743 is fixed + + def testReadWithOffset(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" + ckpt_b_content = b"asdfasdfasdffoobarbuzz" + with fsspec.open(ckpt_path, "w") as f: + f.write(ckpt_content) + with gfile.GFile(ckpt_path, "rb") as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read(12) + self.assertEqual(b"asdfasdfasdf", ckpt_read) + ckpt_read = f.read(6) + self.assertEqual(b"foobar", ckpt_read) + ckpt_read = f.read(1) + self.assertEqual(b"b", ckpt_read) + ckpt_read = f.read() + self.assertEqual(b"uzz", ckpt_read) + ckpt_read = f.read(1000) + self.assertEqual(b"", ckpt_read) + with gfile.GFile(ckpt_path, "rb") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_b_content, ckpt_read) + + def testWrite(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" + with gfile.GFile(ckpt_path, "w") as f: + f.write(ckpt_content) + with fsspec.open(ckpt_path, "r") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testOverwrite(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" + with gfile.GFile(ckpt_path, "w") as f: + f.write("original") + with gfile.GFile(ckpt_path, "w") as f: + f.write(ckpt_content) + with fsspec.open(ckpt_path, "r") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testWriteMultiple(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = "asdfasdfasdffoobarbuzz" * 5 + with gfile.GFile(ckpt_path, "w") as f: + for i in range(0, len(ckpt_content), 3): + f.write(ckpt_content[i : i + 3]) + # Test periodic flushing of the file + if i % 9 == 0: + f.flush() + with fsspec.open(ckpt_path, "r") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testWriteEmpty(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = "" + with gfile.GFile(ckpt_path, "w") as f: + f.write(ckpt_content) + with fsspec.open(ckpt_path, "r") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testWriteBinary(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = b"asdfasdfasdffoobarbuzz" + with gfile.GFile(ckpt_path, "wb") as f: + f.write(ckpt_content) + with fsspec.open(ckpt_path, "rb") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testWriteMultipleBinary(self): + temp_dir = self.get_temp_dir() + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = posixpath.join(temp_dir, "model2.ckpt") + ckpt_content = b"asdfasdfasdffoobarbuzz" * 5 + with gfile.GFile(ckpt_path, "wb") as f: + for i in range(0, len(ckpt_content), 3): + f.write(ckpt_content[i : i + 3]) + # Test periodic flushing of the file + if i % 9 == 0: + f.flush() + with fsspec.open(ckpt_path, "rb") as f: + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def _CreateDeepDirectoryStructure(self, top_directory): + """Creates a reasonable deep structure of subdirectories with files. + + Args: + top_directory: The file:// path of the top level directory in + which to create the directory structure. + """ + + # Add a few subdirectories. + directory_names = ( + # An empty directory. + "foo", + # A directory with an events file (and a text file). + "bar", + # A deeper directory with events files. + "bar/baz", + # A non-empty subdir that lacks event files (should be ignored). + "bar/quux", + # This 3-level deep set of subdirectories tests logic that replaces + # the full glob string with an absolute path prefix if there is + # only 1 subdirectory in the final mapping. + "quuz/garply", + "quuz/garply/corge", + "quuz/garply/grault", + # A directory that lacks events files, but contains a subdirectory + # with events files (first level should be ignored, second level + # should be included). + "waldo", + "waldo/fred", + ) + for directory_name in directory_names: + path = posixpath.join(top_directory, directory_name) + fs, _, paths = fsspec.get_fs_token_paths(path) + fs.makedirs(paths[0]) + + # Add a few files to the directory. + file_names = ( + "a.tfevents.1", + "model.ckpt", + "bar/b.tfevents.1", + "bar/red_herring.txt", + "bar/baz/c.tfevents.1", + "bar/baz/d.tfevents.1", + "bar/quux/some_flume_output.txt", + "bar/quux/some_more_flume_output.txt", + "quuz/e.tfevents.1", + "quuz/garply/f.tfevents.1", + "quuz/garply/corge/g.tfevents.1", + "quuz/garply/grault/h.tfevents.1", + "waldo/fred/i.tfevents.1", + ) + for file_name in file_names: + with fsspec.open( + posixpath.join(top_directory, file_name), "wb" + ) as f: + f.write(b"") + + def _CompareFilesPerSubdirectory(self, expected, gotten): + """Compares iterables of (subdirectory path, list of absolute paths) + + Args: + expected: The expected iterable of 2-tuples. + gotten: The gotten iterable of 2-tuples. + """ + expected_directory_to_files = { + result[0]: list(result[1]) for result in expected + } + gotten_directory_to_files = { + # Note we ignore subdirectories and just compare files + result[0]: list(result[2]) + for result in gotten + } + self.assertCountEqual( + expected_directory_to_files.keys(), + gotten_directory_to_files.keys(), + ) + + for subdir, expected_listing in expected_directory_to_files.items(): + gotten_listing = gotten_directory_to_files[subdir] + self.assertCountEqual( + expected_listing, + gotten_listing, + "Files for subdir %r must match. Expected %r. Got %r." + % (subdir, expected_listing, gotten_listing), + ) + + def testNonExistentFilesystem(self): + with self.assertRaises(ValueError): + gfile.get_filesystem("nonexistent::blah://filesystem") + + def testExistence(self): + self.assertIsInstance( + gfile.get_filesystem("simplecache::nonexistent::file://blah/blah"), + gfile.FSSpecFileSystem, + ) + + def testJoin(self): + fs = gfile.get_filesystem("file://foo") + + # relative + self.assertEqual( + fs.join("bar", "foo", "hi"), + "bar/foo/hi", + ) + # absolute with protocol + self.assertEqual( + fs.join("file:///bar", "foo", "hi"), + "file:///bar/foo/hi", + ) + # empty path element + self.assertEqual( + fs.join("file:///bar", "", "hi"), + "file:///bar/hi", + ) + # relative with protocol + self.assertEqual( + fs.join("file://bar", "foo"), + "file://bar/foo", + ) + # chained relative with protocol + self.assertEqual( + fs.join("simplecache::file://bucket/some/path", "bar"), + "simplecache::file://bucket/some/path/bar", + ) + # chained absolute without protocol + self.assertEqual( + fs.join("simplecache::/some/path", "bar"), + "simplecache::/some/path/bar", + ) + # absolute second part + self.assertEqual( + fs.join("simplecache::/some/path", "/bar"), + "simplecache::/bar", + ) + # absolute second part with protocol + self.assertEqual( + fs.join("simplecache::file:///some/path", "/bar"), + "simplecache::file:///bar", + ) + # trailing / + self.assertEqual( + fs.join("simplecache::/some/path/", "bar/", "foo"), + "simplecache::/some/path/bar/foo", + ) + # Trailing slash on the last element + self.assertEqual( + fs.join("hello", "world/"), + "hello/world/", + ) + # empty path at the end + self.assertEqual( + fs.join("hello", "world", ""), + "hello/world/", + ) + # absolute path in the middle + self.assertEqual( + fs.join("hello", "", "world", "", "/wow"), + "/wow", + ) + + def testComplexChaining(self): + path = "simplecache::zip://*::file://banana/bar" + with self.assertRaisesRegexp( + errors.InvalidArgumentError, + "fsspec URL must only have paths in the last chained filesystem", + ): + gfile.exists(path) + + def testGlobChaining(self): + """ + This tests glob with chained file systems. + """ + temp_dir = self.get_temp_dir() + on_disk = temp_dir.split("://")[1] + + with open(posixpath.join(on_disk, "foo.txt"), "wb") as myfile: + myfile.write(b"foo") + + with open(posixpath.join(on_disk, "bar.txt"), "wb") as myfile: + myfile.write(b"bar") + + foo_raw = posixpath.join(temp_dir, "foo.txt") + foo_cached = "simplecache::" + foo_raw + + self.assertTrue(gfile.exists(foo_raw)) + self.assertTrue(gfile.exists(foo_cached)) + + cache_dir = "simplecache::" + temp_dir + files = gfile.glob(posixpath.join(cache_dir, "*.txt")) + self.assertCountEqual( + files, + [ + posixpath.join(cache_dir, "foo.txt"), + posixpath.join(cache_dir, "bar.txt"), + ], + ) + + def testGlobChainingNoProtocol(self): + """ + This tests the glob prefix application when there's no protocol + specified on the chained path. + """ + temp_dir = self.get_temp_dir().split("://")[1] + + with open(posixpath.join(temp_dir, "foo.txt"), "wb") as myfile: + myfile.write(b"foo") + + with open(posixpath.join(temp_dir, "bar.txt"), "wb") as myfile: + myfile.write(b"bar") + + foo_raw = posixpath.join(temp_dir, "foo.txt") + foo_cached = "simplecache::" + foo_raw + + fs = gfile.get_filesystem("file://") + cached_fs = gfile.get_filesystem("simplecache::file://") + + self.assertTrue(fs.exists(foo_raw)) + self.assertTrue(cached_fs.exists(foo_cached)) + + cache_dir = "simplecache::" + temp_dir + files = cached_fs.glob(posixpath.join(cache_dir, "*.txt")) + self.assertCountEqual( + files, + [ + posixpath.join(cache_dir, "foo.txt"), + posixpath.join(cache_dir, "bar.txt"), + ], + ) + + def testGlobNonAbsolute(self): + """ + This tests glob with in memory file system which does not return + absolute paths from glob. + """ + fs = fsspec.filesystem("memory") + fs.mkdir("dir") + fs.touch("dir/foo.txt") + fs.touch("dir/bar.txt") + + root = "memory://dir" + + files = gfile.glob(posixpath.join(root, "*.txt")) + self.assertCountEqual( + files, + [ + posixpath.join(root, "foo.txt"), + posixpath.join(root, "bar.txt"), + ], + ) + + +if __name__ == "__main__": + tb_test.main() diff --git a/tensorboard/pip_package/requirements_dev.txt b/tensorboard/pip_package/requirements_dev.txt index c621e405c5..42f7944076 100644 --- a/tensorboard/pip_package/requirements_dev.txt +++ b/tensorboard/pip_package/requirements_dev.txt @@ -21,6 +21,8 @@ pandas~=1.0 # For gfile S3 test boto3==1.9.86 moto==1.3.7 +# For gfile fsspec test +fsspec==0.7.4 # For linting black==20.8b1 diff --git a/tensorboard/summary/writer/BUILD b/tensorboard/summary/writer/BUILD index fe6301d8de..ff742cf96b 100644 --- a/tensorboard/summary/writer/BUILD +++ b/tensorboard/summary/writer/BUILD @@ -56,6 +56,20 @@ py_test( ], ) +py_test( + name = "event_file_writer_fsspec_test", + size = "small", + srcs = ["event_file_writer_fsspec_test.py"], + main = "event_file_writer_fsspec_test.py", + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":writer", + "//tensorboard:expect_fsspec_installed", + "//tensorboard:test", + ], +) + py_test( name = "record_writer_test", size = "small", diff --git a/tensorboard/summary/writer/event_file_writer_fsspec_test.py b/tensorboard/summary/writer/event_file_writer_fsspec_test.py new file mode 100644 index 0000000000..038dff9529 --- /dev/null +++ b/tensorboard/summary/writer/event_file_writer_fsspec_test.py @@ -0,0 +1,87 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +# """Tests for EventFileWriter and _AsyncWriter""" + + +import os +import unittest + +from tensorboard.summary.writer.event_file_writer import EventFileWriter +from tensorboard.compat.proto import event_pb2 +from tensorboard.compat.proto.summary_pb2 import Summary +from tensorboard.compat.tensorflow_stub.pywrap_tensorflow import ( + PyRecordReader_New, +) +from tensorboard.compat import tf +from tensorboard import test as tb_test + +import fsspec + +USING_REAL_TF = tf.__version__ != "stub" + + +class EventFileWriterFSSpecTest(tb_test.TestCase): + def get_temp_dir(self): + return "file://" + super().get_temp_dir() + + def glob(self, path): + fs, _, _ = fsspec.get_fs_token_paths( + path, + ) + return fs.glob(path) + + @unittest.skipIf(USING_REAL_TF, "Test only passes when using stub TF") + def test_event_file_writer_roundtrip(self): + _TAGNAME = "dummy" + _DUMMY_VALUE = 42 + logdir = self.get_temp_dir() + w = EventFileWriter(logdir) + summary = Summary( + value=[Summary.Value(tag=_TAGNAME, simple_value=_DUMMY_VALUE)] + ) + fakeevent = event_pb2.Event(summary=summary) + w.add_event(fakeevent) + w.close() + event_files = sorted(self.glob(os.path.join(logdir, "*"))) + self.assertEqual(len(event_files), 1) + r = PyRecordReader_New(event_files[0]) + r.GetNext() # meta data, so skip + r.GetNext() + self.assertEqual(fakeevent.SerializeToString(), r.record()) + + @unittest.skipIf(USING_REAL_TF, "Test only passes when using stub TF") + def test_setting_filename_suffix_works(self): + logdir = self.get_temp_dir() + + w = EventFileWriter(logdir, filename_suffix=".event_horizon") + w.close() + event_files = sorted(self.glob(os.path.join(logdir, "*"))) + self.assertEqual(event_files[0].split(".")[-1], "event_horizon") + + @unittest.skipIf(USING_REAL_TF, "Test only passes when using stub TF") + def test_async_writer_without_write(self): + logdir = self.get_temp_dir() + w = EventFileWriter(logdir) + w.close() + event_files = sorted(self.glob(os.path.join(logdir, "*"))) + r = PyRecordReader_New(event_files[0]) + r.GetNext() + s = event_pb2.Event.FromString(r.record()) + self.assertEqual(s.file_version, "brain.Event:2") + + +if __name__ == "__main__": + tb_test.main()