From f9203ff97e69ddab5dd10ee40b787cc39a9a1b08 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 24 Jan 2022 16:15:31 -0800 Subject: [PATCH 1/4] core: add --detect_file_replacement flag and plumbing --- .../backend/event_processing/data_ingester.py | 1 + .../plugin_event_accumulator.py | 26 ++++++++++++++---- .../plugin_event_multiplexer.py | 6 +++++ tensorboard/plugins/core/core_plugin.py | 27 +++++++++++++++++++ tensorboard/program.py | 7 +++++ tensorboard/program_test.py | 2 ++ 6 files changed, 64 insertions(+), 5 deletions(-) diff --git a/tensorboard/backend/event_processing/data_ingester.py b/tensorboard/backend/event_processing/data_ingester.py index 3eb520c502..52ec9e3275 100644 --- a/tensorboard/backend/event_processing/data_ingester.py +++ b/tensorboard/backend/event_processing/data_ingester.py @@ -69,6 +69,7 @@ def __init__(self, flags): purge_orphaned_data=flags.purge_orphaned_data, max_reload_threads=flags.max_reload_threads, event_file_active_filter=_get_event_file_active_filter(flags), + detect_file_replacement=flags.detect_file_replacement, ) self._data_provider = data_provider.MultiplexerDataProvider( self._multiplexer, flags.logdir or flags.logdir_spec diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 7e2bb206f7..3ff4c1e041 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -99,6 +99,7 @@ def __init__( tensor_size_guidance=None, purge_orphaned_data=True, event_file_active_filter=None, + detect_file_replacement=None, ): """Construct the `EventAccumulator`. @@ -122,6 +123,9 @@ def __init__( event_file_active_filter: Optional predicate for determining whether an event file latest load timestamp should be considered active. If passed, this will enable multifile directory loading. + detect_file_replacement: Optional boolean; if True, event file loading + will try to detect when a file has been replaced with a new version + that contains additional data, by monitoring the file size. """ size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE) sizes = {} @@ -155,7 +159,9 @@ def __init__( self._plugin_tag_lock = threading.Lock() self.path = path - self._generator = _GeneratorFromPath(path, event_file_active_filter) + self._generator = _GeneratorFromPath( + path, event_file_active_filter, detect_file_replacement + ) self._generator_mutex = threading.Lock() self.purge_orphaned_data = purge_orphaned_data @@ -639,23 +645,33 @@ def _GetPurgeMessage( ) -def _GeneratorFromPath(path, event_file_active_filter=None): +def _GeneratorFromPath( + path, event_file_active_filter=None, detect_file_replacement=None +): """Create an event generator for file or directory at given path string.""" if not path: raise ValueError("path must be a valid string") if io_wrapper.IsSummaryEventsFile(path): - return event_file_loader.EventFileLoader(path) + return event_file_loader.EventFileLoader(path, detect_file_replacement) elif event_file_active_filter: + loader_factory = ( + lambda path: event_file_loader.TimestampedEventFileLoader( + path, detect_file_replacement + ) + ) return directory_loader.DirectoryLoader( path, - event_file_loader.TimestampedEventFileLoader, + loader_factory, path_filter=io_wrapper.IsSummaryEventsFile, active_filter=event_file_active_filter, ) else: + loader_factory = lambda path: event_file_loader.EventFileLoader( + path, detect_file_replacement + ) return directory_watcher.DirectoryWatcher( path, - event_file_loader.EventFileLoader, + loader_factory, io_wrapper.IsSummaryEventsFile, ) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py index 891985cafa..7dd96738f3 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -77,6 +77,7 @@ def __init__( purge_orphaned_data=True, max_reload_threads=None, event_file_active_filter=None, + detect_file_replacement=None, ): """Constructor for the `EventMultiplexer`. @@ -98,6 +99,9 @@ def __init__( event_file_active_filter: Optional predicate for determining whether an event file latest load timestamp should be considered active. If passed, this will enable multifile directory loading. + detect_file_replacement: Optional boolean; if True, event file loading + will try to detect when a file has been replaced with a new version + that contains additional data, by monitoring the file size. """ logger.info("Event Multiplexer initializing.") self._accumulators_mutex = threading.Lock() @@ -111,6 +115,7 @@ def __init__( self.purge_orphaned_data = purge_orphaned_data self._max_reload_threads = max_reload_threads or 1 self._event_file_active_filter = event_file_active_filter + self._detect_file_replacement = detect_file_replacement if run_path_map is not None: logger.info( "Event Multplexer doing initialization load for %s", @@ -159,6 +164,7 @@ def AddRun(self, path, name=None): tensor_size_guidance=self._tensor_size_guidance, purge_orphaned_data=self.purge_orphaned_data, event_file_active_filter=self._event_file_active_filter, + detect_file_replacement=self._detect_file_replacement, ) self._accumulators[name] = accumulator self._paths[name] = path diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index 39e673daa8..882f1c0b19 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -646,6 +646,26 @@ def define_flags(self, parser): means keep all samples of that type. For instance "scalars=500,images=0" keeps 500 scalars and all images. Most users should not need to set this flag.\ +""", + ) + + parser.add_argument( + "--detect_file_replacement", + metavar="BOOL", + # Custom str-to-bool converter since regular bool() doesn't work. + type=lambda v: {"true": True, "false": False}.get(v.lower(), v), + choices=[True, False], + default=None, + help="""\ +[experimental] If true, this enables experimental support for detecting when +event files are replaced with new versions that contain additional data. This is +not needed in the normal case where new data is either appended to an existing +file or written to a brand new file, but it arises, for example, when using +rsync without the --inplace option, in which new versions of the original file +are first written to a temporary file, then swapped into the final location. + +This option is currently incompatible with --load_fast=true, and if passed will +disable fast-loading mode. (default: false)\ """, ) @@ -683,6 +703,13 @@ def fix_flags(self, flags): ) elif flags.host is not None and flags.bind_all: raise FlagsError("Must not specify both --host and --bind_all.") + elif ( + flags.load_fast == "true" and flags.detect_file_replacement is True + ): + raise FlagsError( + "Must not specify both --load_fast=true and" + "--detect_file_replacement=true" + ) flags.path_prefix = flags.path_prefix.rstrip("/") if flags.path_prefix and not flags.path_prefix.startswith("/"): diff --git a/tensorboard/program.py b/tensorboard/program.py index 5914dc00b7..257ac42eac 100644 --- a/tensorboard/program.py +++ b/tensorboard/program.py @@ -491,6 +491,13 @@ def _should_use_data_server(flags): "paths; falling back to slower Python-only load path." ) return False + if flags.detect_file_replacement is True: + logger.info( + "Note: --detect_file_replacement=true is not supported with " + "--load_fast behavior; falling back to slower Python-only load " + "path." + ) + return False return True diff --git a/tensorboard/program_test.py b/tensorboard/program_test.py index 2707f879a8..cf5eaa8663 100644 --- a/tensorboard/program_test.py +++ b/tensorboard/program_test.py @@ -86,6 +86,7 @@ def test_should_use_data_server(self): def f(**kwargs): kwargs.setdefault("logdir", "") kwargs.setdefault("logdir_spec", "") + kwargs.setdefault("detect_file_replacement", None) flags = argparse.Namespace() for k, v in kwargs.items(): setattr(flags, k, v) @@ -96,6 +97,7 @@ def f(**kwargs): self.assertTrue(f(logdir="logs/mnist/")) self.assertTrue(f(logdir="gs://logs")) self.assertFalse(f(logdir="notgs://logs")) + self.assertFalse(f(logdir="foo", detect_file_replacement=True)) class WerkzeugServerTest(tb_test.TestCase): From 47440cff07a5b1ce1196bdc53a0d1f7562f219c2 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 31 Jan 2022 17:45:25 -0800 Subject: [PATCH 2/4] core_plugin_test.py: add load_fast --- tensorboard/plugins/core/core_plugin_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tensorboard/plugins/core/core_plugin_test.py b/tensorboard/plugins/core/core_plugin_test.py index 1b78a28f1d..69e024e916 100644 --- a/tensorboard/plugins/core/core_plugin_test.py +++ b/tensorboard/plugins/core/core_plugin_test.py @@ -53,6 +53,7 @@ def __init__( grpc_data_provider="", host=None, inspect=False, + load_fast="auto", logdir="", logdir_spec="", path_prefix="", @@ -66,6 +67,7 @@ def __init__( self.grpc_data_provider = grpc_data_provider self.host = host self.inspect = inspect + self.load_fast = load_fast self.logdir = logdir self.logdir_spec = logdir_spec self.path_prefix = path_prefix From a5ceedab378e0fcc8a3ef6cdd839269c8ad26b3c Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 31 Jan 2022 17:19:32 -0800 Subject: [PATCH 3/4] ignore unrecognized accumulator kwargs in multiplexer test --- .../event_processing/plugin_event_multiplexer_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index cf5769d1d8..30faa2bc63 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -94,13 +94,9 @@ def Reload(self): def _GetFakeAccumulator( path, - size_guidance=None, - tensor_size_guidance=None, - purge_orphaned_data=None, - event_file_active_filter=None, + **unused_kwargs, ): - del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused. - del event_file_active_filter # unused + del unused_kwargs return _FakeAccumulator(path) From d5a703641b1ec733be2aa5bf495371ddd520e57e Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 31 Jan 2022 17:22:48 -0800 Subject: [PATCH 4/4] data_ingester_test.py add detect_file_replacement flag --- tensorboard/backend/event_processing/data_ingester_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tensorboard/backend/event_processing/data_ingester_test.py b/tensorboard/backend/event_processing/data_ingester_test.py index 8cbfc291f0..380841d76a 100644 --- a/tensorboard/backend/event_processing/data_ingester_test.py +++ b/tensorboard/backend/event_processing/data_ingester_test.py @@ -32,6 +32,7 @@ class FakeFlags(object): def __init__( self, + detect_file_replacement=None, generic_data="auto", logdir="", logdir_spec="", @@ -45,6 +46,7 @@ def __init__( samples_per_plugin=None, window_title="", ): + self.detect_file_replacement = detect_file_replacement self.generic_data = generic_data self.logdir = logdir self.logdir_spec = logdir_spec